Skip to content

Tracker

Tracker is the core class for provenance persistence, cache lookups, and query access. If you want explicit control over where state is stored and how runs are executed, start here.

When to use Tracker

  • You are building a reusable library or service and want explicit dependencies.
  • You want direct control over lifecycle methods like start_run, run, scenario, and query methods (find_runs, run_set, get_artifact, lineage helpers).
  • You want to avoid relying on global context (consist.use_tracker(...)).

Run lookup helpers expose workflow-aware filters such as stage= and phase=. Use those when you need to query by lifecycle step or pipeline stage instead of treating those values as opaque metadata.

Minimal runnable example

from pathlib import Path
import consist
from consist import Tracker

tracker = Tracker(run_dir="./runs", db_path="./provenance.duckdb")

def write_summary() -> Path:
    out = consist.output_path("summary", ext="txt")
    out.write_text("summary\n")
    return out

result = tracker.run(fn=write_summary, outputs=["summary"])
latest = tracker.find_latest_run(model=result.run.model_name)

print(result.run.id)
print(latest.id if latest else None)

find_runs(...) and find_latest_run(...) accept stage= and phase= as first-class workflow filters, alongside the existing run dimensions such as year, iteration, model, and status. Consist mirrors those values into run.meta for backward compatibility, but the canonical fields live on Run.

For top-level wrappers around these methods, see API Helpers. For grouped workflows, see Workflow Contexts.

Constructing with TrackerConfig

Use Tracker.from_config(...) when you want a typed configuration object for tracker construction.

from consist.core.tracker import Tracker
from consist.core.tracker_config import TrackerConfig

config = TrackerConfig(run_dir="./runs", db_path="./provenance.duckdb")
tracker = Tracker.from_config(config)

Public identity kwargs (run / trace)

Use adapter and identity_inputs on Tracker.run(...) and Tracker.trace(...):

result = tracker.run(
    fn=simulate,
    adapter=activitysim_adapter,
    identity_inputs=[("asim_config", asim_config_dir)],
)

with tracker.trace(
    "simulate_trace",
    adapter=activitysim_adapter,
    identity_inputs=[("asim_config", asim_config_dir)],
):
    simulate_inline()

config_plan and hash_inputs are not accepted on Tracker.run(...) and Tracker.trace(...). Use adapter and identity_inputs.

Config Override Selectors

Tracker.run_with_config_overrides(...) now supports one-of base selectors:

  • base_run_id="existing_run_id" for historical bundle/config artifacts
  • base_config_dirs=[Path("configs"), ...] for first-run override execution

Use exactly one selector. Passing both raises a ValueError.

base_primary_config=... is optional and only applies with base_config_dirs (for adapters that require/accept a primary config hint).

run_with_config_overrides(...) also accepts:

  • identity_inputs=[...]: additive hash-only inputs. These are merged with the adapter-resolved config identity when enabled.
  • resolved_config_identity="auto" | "off":
  • "auto" (default) injects the adapter-selected resolved config root into identity hashing using identity_label.
  • "off" disables that auto injection and keeps only user-provided identity_inputs.

For override runs, Consist persists standardized run metadata: run.meta["resolved_config_identity"] with mode, adapter, label, path, and digest.

The central orchestrator for Consist, managing the lifecycle of a Run and its associated Artifacts.

The Tracker is responsible for:

  1. Initiating and managing the state of individual "Runs" (e.g., model executions, data processing steps).

  2. Logging "Artifacts" (input files, output data, etc.) and their relationships to runs.

  3. Implementing a dual-write mechanism, logging provenance to both human-readable JSON files (consist.json) and a DuckDB-backed store. In this refactor phase, one configured db_path still points to a single local DuckDB file used by both internal stores: metadata_store (runs/artifacts/lineage metadata) and hot_data_store (global_tables.* ingest/load surfaces).

  4. Providing path virtualization to make runs portable across different environments, as described in the "Path Resolution & Mounts" architectural section.

  5. Facilitating smart caching based on a Merkle DAG strategy, enabling "run forking" and "hydration" of previously computed results.

last_run property

Return the most recent run record observed by this tracker.

Returns:

Type Description
Optional[ConsistRecord]

The last completed/failed run record for this tracker instance, or None if no run has executed yet.

is_cached property

Whether the currently active run is a cache hit.

Returns:

Type Description
bool

True if the current start_run/run/trace execution is reusing a cached run. Returns False if no run is active.

registered_schemas property

Return the SQLModel schemas registered on this tracker.

Registered schemas are the SQLModel classes passed via Tracker(..., schemas=[...]) during initialization. They are stored by class name (for example, "LinkstatsRow") and used by lookup-based workflows such as schema-aware ingestion.

Returns:

Type Description
Mapping[str, Type[SQLModel]]

Read-only mapping from schema class name to the corresponding SQLModel class object.

Notes

The returned mapping is immutable from the caller perspective.

Examples:

tracker = Tracker(..., schemas=[MySchema])
assert "MySchema" in tracker.registered_schemas

engine property

Return the SQLAlchemy engine used by this tracker.

This is a single-store compatibility alias. New code should prefer explicit metadata_store / hot_data_store ownership boundaries.

Returns:

Type Description
Optional[Engine]

The SQLAlchemy engine if a database is configured, otherwise None.

from_config(config) classmethod

Construct a tracker from a TrackerConfig object.

begin_run(run_id, model, config=None, inputs=None, tags=None, description=None, cache_mode='reuse', *, artifact_dir=None, allow_external_paths=None, facet=None, facet_from=None, hash_inputs=None, code_identity=None, code_identity_extra_deps=None, facet_schema_version=None, facet_index=True, **kwargs)

Start a run imperatively (without context manager).

Use this when run start and end are in separate methods, or when integrating with frameworks that have their own lifecycle management. Returns the Run object. Call end_run() when complete.

This provides an alternative to the context manager pattern when you need more control over the run lifecycle, such as in external model integrations where start_model_run() and complete_model_run() are separate method calls.

Parameters:

Name Type Description Default
run_id str

A unique identifier for the current run.

required
model str

A descriptive name for the model or process being executed (non-empty, length-limited).

required
config Union[Dict[str, Any], BaseModel, None]

Configuration parameters for this run. Keys must be strings; extremely large string values are rejected.

None
inputs Optional[list[ArtifactRef]]

A list of input paths (str/Path) or Artifact references.

None
tags Optional[List[str]]

A list of string labels for categorization and filtering (non-empty, length-limited).

None
description Optional[str]

A human-readable description of the run's purpose.

None
cache_mode str

Strategy for caching: "reuse", "overwrite", or "readonly".

"reuse"
artifact_dir Optional[Union[str, Path]]

Override the per-run artifact directory. Relative paths are resolved under <run_dir>/outputs. Absolute paths must remain within run_dir unless allow_external_paths is enabled.

None
allow_external_paths Optional[bool]

Allow artifact_dir and cached-output materialization outside run_dir. Defaults to the Tracker setting when unset.

None
facet Optional[FacetLike]

Optional small, queryable configuration facet to persist alongside the run. This is distinct from config (which is hashed and stored in the JSON snapshot).

None
facet_from Optional[List[str]]

List of config keys to extract into the facet. Extracted values are merged with any explicit facet, with explicit keys taking precedence.

None
hash_inputs HashInputs

Extra inputs to include in the run identity hash without logging them as run inputs/outputs. Useful for config bundles or auxiliary files. Each entry is either a path (str/Path) or a named tuple (name, path).

None
code_identity Optional[CodeIdentityMode]

Strategy for hashing code identity in cache keys. "repo_git" (default) uses repository git state. "callable_module" and "callable_source" scope identity to the callable executed by tracker.run.

None
code_identity_extra_deps Optional[List[str]]

Extra dependency file paths to fold into callable-scoped code identity.

None
facet_schema_version Optional[Union[str, int]]

Optional schema version tag for the persisted facet.

None
facet_index bool

Whether to flatten and index facet keys/values for DB querying.

True
stage Optional[str]

Optional workflow stage label persisted on the run.

required
phase Optional[str]

Optional lifecycle phase label persisted on the run.

required
**kwargs Any

Additional metadata. Special keywords year, iteration, stage, and phase can be used. Metadata keys/values are validated and size-limited; use CONSIST_MAX_METADATA_ITEMS/KEY_LENGTH/VALUE_LENGTH to override.

{}

Returns:

Type Description
Run

The Run object representing the started run.

Raises:

Type Description
RuntimeError

If there is already an active run.

Example
run = tracker.begin_run("run_001", "urbansim", config={...})
try:
    tracker.log_artifact(input_file, direction="input")
    # ... do work ...
    tracker.log_artifact(output_file, direction="output")
    tracker.end_run("completed")
except Exception as e:
    tracker.end_run("failed", error=e)
    raise

start_run(run_id, model, **kwargs)

Context manager to initiate and manage the lifecycle of a Consist run.

This is the primary entry point for defining a reproducible and observable unit of work. It wraps the imperative begin_run()/end_run() methods to provide automatic cleanup and exception handling.

Parameters:

Name Type Description Default
run_id str

A unique identifier for the current run.

required
model str

A descriptive name for the model or process being executed.

required
**kwargs Any

Additional arguments forwarded to begin_run(), including commonly used keys:

  • config: Union[Dict[str, Any], BaseModel, None]
  • inputs: Optional[list[ArtifactRef]]
  • tags: Optional[List[str]]
  • description: Optional[str]
  • cache_mode: str ("reuse", "overwrite", "readonly")
  • facet, facet_from, hash_inputs, facet_schema_version, facet_index
  • year, iteration, stage, phase
{}

Yields:

Type Description
Tracker

The current Tracker instance for use within the with block.

Raises:

Type Description
Exception

Any exception raised within the with block will be caught, the run marked as "failed", and then re-raised after cleanup.

See Also

begin_run : Imperative alternative for starting runs. end_run : Imperative alternative for ending runs.

Example
 with tracker.start_run("run_1", "my_model", config={"p": 1}):
     tracker.log_artifact("data.csv", "input")
     # ... execution ...
     tracker.log_artifact("results.parquet", "output")

run(fn=None, name=None, *, run_id=None, model=None, description=None, config=None, adapter=None, config_plan_ingest=True, config_plan_profile_schema=False, inputs=None, input_keys=None, optional_input_keys=None, depends_on=None, tags=None, facet=None, facet_from=None, facet_schema_version=None, facet_index=None, identity_inputs=None, year=None, iteration=None, phase=None, stage=None, parent_run_id=None, outputs=None, output_paths=None, capture_dir=None, capture_pattern='*', cache_options=None, output_policy=None, execution_options=None, runtime_kwargs=None)

Execute a function-shaped run with caching and output handling.

This method executes a callable (or container) with automatic provenance tracking, intelligent caching based on code+config+inputs, and artifact logging.

Parameters:

Name Type Description Default
fn Optional[Callable]

The function to execute. Required for executor='python'. Can be None for executor='container'.

None
name Optional[str]

Human-readable name for the run. Defaults to function name if not provided.

None
run_id Optional[str]

Unique identifier for this run. Auto-generated if not provided.

None
model Optional[str]

Model/component name for categorizing runs. Defaults to the run name.

None
description Optional[str]

Human-readable description of the run.

None
config Optional[Dict[str, Any]]

Configuration parameters. Becomes part of the cache signature. Can be a dict or Pydantic model.

None
adapter Optional[ConfigAdapter]

Config adapter used to derive a config plan before execution.

None
config_plan_ingest bool

Whether to ingest tables from the config plan.

True
config_plan_profile_schema bool

Whether to profile ingested schemas for the config plan.

False
inputs Optional[Mapping[str, RunInputRef] | Iterable[RunInputRef]]

Input files or artifacts. - Dict: Maps names to paths/Artifacts. Named inputs can bind into function parameters according to execution_options.input_binding (or legacy load_inputs). - List/Iterable: Hashed for cache key but not automatically bound.

None
input_keys Optional[Iterable[str] | str]

Deprecated. Use inputs mapping instead.

None
optional_input_keys Optional[Iterable[str] | str]

Deprecated. Use inputs mapping instead.

None
depends_on Optional[List[RunInputRef]]

Additional file paths or artifacts to hash for the cache signature (e.g., config files).

None
tags Optional[List[str]]

Labels for filtering and organizing runs (e.g., ["production", "baseline"]).

None
facet Optional[FacetLike]

Queryable metadata facets (small config values) logged to the run.

None
facet_from Optional[List[str]]

List of config keys to extract and log as facets.

None
facet_schema_version Optional[Union[str, int]]

Schema version for facet compatibility tracking.

None
facet_index Optional[bool]

Whether to index facets for faster queries.

None
identity_inputs Optional[IdentityInputs]

Additional hash-only identity inputs (for example config files or directories) that should affect cache keys without being logged as run inputs.

None
year Optional[int]

Year metadata (for multi-year simulations). Included in provenance.

None
iteration Optional[int]

Iteration count (for iterative workflows). Included in provenance.

None
phase Optional[str]

Optional lifecycle phase label persisted in run metadata.

None
stage Optional[str]

Optional workflow stage label persisted in run metadata.

None
parent_run_id Optional[str]

Parent run ID (for nested runs in scenarios).

None
outputs Optional[List[str]]

Output artifact keys for return-value logging with executor='python'. Supports DataFrame/Series/xarray returns and path-like returns. If omitted, Consist auto-logs artifact-like returns (Path/str/Artifact or dict[str, ...]) when output_paths is not provided.

None
output_paths Optional[Mapping[str, ArtifactRef]]

Output file paths to log. Dict maps artifact keys to host paths or Artifact refs.

None
capture_dir Optional[Path]

Directory to scan for outputs (legacy tools that write to specific dirs).

None
capture_pattern str

Glob pattern for capturing outputs (used with capture_dir).

"*"
cache_options Optional[CacheOptions]

Grouped cache controls (cache_mode, cache_hydration, cache_version, cache_epoch, validate_cached_outputs, code_identity, code_identity_extra_deps).

None
output_policy Optional[OutputPolicyOptions]

Grouped output policies (output_mismatch, output_missing).

None
execution_options Optional[ExecutionOptions]

Grouped execution controls (input_binding, legacy load_inputs, executor, container, runtime_kwargs, inject_context).

None
runtime_kwargs Optional[Mapping[str, Any]]

Top-level alias for execution_options.runtime_kwargs. This is mutually exclusive with execution_options=ExecutionOptions(runtime_kwargs=...).

None

Returns:

Type Description
RunResult

Contains: - outputs: Dict[str, Artifact] of logged output artifacts - cache_hit: bool indicating if this was a cache hit - run_id: The run's unique identifier

Raises:

Type Description
ValueError

If fn is None (for executor='python'), or if container/output_paths not provided for executor='container'.

RuntimeError

If the function execution fails or container execution returns non-zero code.

Examples:

Execute a basic data processing step:

def clean_data(raw: pd.DataFrame) -> pd.DataFrame:
    return raw[raw['value'] > 0.5]

result = tracker.run(
    fn=clean_data,
    inputs={"raw": Path("raw.csv")},
    outputs=["cleaned"],
)

Configure identity hashing for granular cache control:

result = tracker.run(
    fn=clean_data,
    inputs={"raw": Path("raw.csv")},
    config={"threshold": 0.5},
    outputs=["cleaned"],
)
See Also

start_run : Manual run context management (more control) trace : Context manager alternative (always executes, even on cache hit)

run_with_config_overrides(*, adapter, base_run_id=None, base_config_dirs=None, base_primary_config=None, overrides, output_dir, fn, name, model=None, config=None, outputs=None, execution_options=None, strict=True, identity_inputs=None, resolved_config_identity='auto', identity_label='activitysim_config', override_runtime_kwargs=None, **run_kwargs)

Delegate config-override execution to an adapter-specific implementation.

The tracker remains adapter-agnostic by forwarding to adapter.run_with_config_overrides(...) when available.

Exactly one base selector is required: base_run_id or base_config_dirs. base_primary_config is optional and only applies to base_config_dirs flows.

trace(name, *, run_id=None, model=None, description=None, config=None, adapter=None, config_plan_ingest=True, config_plan_profile_schema=False, inputs=None, input_keys=None, optional_input_keys=None, depends_on=None, tags=None, facet=None, facet_from=None, facet_schema_version=None, facet_index=None, identity_inputs=None, year=None, iteration=None, parent_run_id=None, outputs=None, output_paths=None, capture_dir=None, capture_pattern='*', cache_mode='reuse', cache_hydration=None, cache_version=None, cache_epoch=None, validate_cached_outputs='lazy', code_identity=None, code_identity_extra_deps=None, output_mismatch='warn', output_missing='warn')

Context manager for inline tracing of a run with inline execution.

This context manager allows you to define a run directly within a with block, with the Python code inside executing every time (even on cache hits). This differs from tracker.run(), which skips execution on cache hits.

Use trace() when you need inline control: for data loading, file I/O, or integrations that require code execution regardless of cache state.

Parameters:

Name Type Description Default
name str

Human-readable name for the run. Also defaults the model name if not provided.

required
run_id Optional[str]

Unique identifier for this run. Auto-generated if not provided.

None
model Optional[str]

Model/component name for categorizing runs. Defaults to the run name.

None
description Optional[str]

Human-readable description of the run.

None
config Optional[Dict[str, Any]]

Configuration parameters. Becomes part of the cache signature. Can be a dict or Pydantic model.

None
adapter Optional[ConfigAdapter]

Config adapter used to derive a config plan before execution.

None
config_plan_ingest bool

Whether to ingest tables from the config plan.

True
config_plan_profile_schema bool

Whether to profile ingested schemas for the config plan.

False
inputs Optional[Mapping[str, RunInputRef] | Iterable[RunInputRef]]

Input files or artifacts. - Dict: Maps names to paths/Artifacts. Logged as inputs but not auto-loaded. - List/Iterable: Hashed for cache key but not auto-loaded.

None
input_keys Optional[Iterable[str] | str]

Deprecated. Use inputs mapping instead.

None
optional_input_keys Optional[Iterable[str] | str]

Deprecated. Use inputs mapping instead.

None
depends_on Optional[List[RunInputRef]]

Additional file paths or artifacts to hash for the cache signature (e.g., config files).

None
tags Optional[List[str]]

Labels for filtering and organizing runs (e.g., ["production", "baseline"]).

None
facet Optional[FacetLike]

Queryable metadata facets (small config values) logged to the run.

None
facet_from Optional[List[str]]

List of config keys to extract and log as facets.

None
facet_schema_version Optional[Union[str, int]]

Schema version for facet compatibility tracking.

None
facet_index Optional[bool]

Whether to index facets for faster queries.

None
identity_inputs Optional[IdentityInputs]

Additional hash-only identity inputs (for example config files or directories) that should affect cache keys without being logged as run inputs.

None
year Optional[int]

Year metadata (for multi-year simulations). Included in provenance.

None
iteration Optional[int]

Iteration count (for iterative workflows). Included in provenance.

None
parent_run_id Optional[str]

Parent run ID (for nested runs in scenarios).

None
outputs Optional[List[str]]

Names of output artifacts to log. Each item is a key name for logged outputs.

None
output_paths Optional[Mapping[str, ArtifactRef]]

Output file paths to log. Dict maps artifact keys to host paths or Artifact refs.

None
capture_dir Optional[Path]

Directory to scan for outputs. New/modified files are auto-logged.

None
capture_pattern str

Glob pattern for capturing outputs (used with capture_dir).

"*"
cache_mode str

Cache behavior: "reuse" (return cache hit), "overwrite" (always re-execute), or "skip_check".

"reuse"
cache_hydration Optional[str]

Materialization strategy for cache hits: - "outputs-requested": Copy only output_paths to disk - "outputs-all": Copy all cached outputs to run_artifact_dir - "inputs-missing": Backfill missing inputs from prior runs before executing

None
cache_version Optional[int]

Optional cache-version discriminator folded into run identity.

None
cache_epoch Optional[int]

Optional cache-epoch discriminator folded into run identity.

None
validate_cached_outputs str

Validation for cached outputs: "lazy" (check if files exist), "strict", or "none".

"lazy"
code_identity Optional[CodeIdentityMode]

Strategy for hashing code identity in cache keys.

None
code_identity_extra_deps Optional[List[str]]

Extra dependency file paths folded into code identity hashing.

None
output_mismatch str

Behavior when output count doesn't match: "warn", "error", or "ignore".

"warn"
output_missing str

Behavior when expected outputs are missing: "warn", "error", or "ignore".

"warn"

Yields:

Type Description
Tracker

The current Tracker instance for use within the with block.

Raises:

Type Description
ValueError

If output_mismatch or output_missing are invalid values.

RuntimeError

If output validation fails based on validation settings.

Notes

Unlike tracker.run(), the Python code inside a trace() block ALWAYS executes, even on cache hits. This is useful for side effects, data loading, or code that should run regardless of cache state.

If you want to skip execution on cache hits (like tracker.run()), consider using tracker.run() with a callable instead.

Examples:

Simple inline tracing with file capture:

with tracker.trace(
    "my_analysis",
    output_paths={"results": "./results.csv"}
):
    df = pd.read_csv("raw.csv")
    df["value"] = df["value"] * 2
    df.to_csv("./results.csv", index=False)

Multi-year simulation loop:

with tracker.scenario("baseline") as sc:
    for year in [2020, 2030, 2040]:
        with sc.trace(name="simulate", year=year):
            results = run_model(year)
            tracker.log_artifact(results, key="output")
See Also

run : Function-shaped alternative (skips on cache hit) scenario : Multi-step workflow grouping start_run : Imperative alternative for run lifecycle management

scenario(name, config=None, tags=None, model='scenario', step_cache_hydration=None, name_template=None, cache_epoch=None, coupler=None, require_outputs=None, **kwargs)

Create a ScenarioContext to manage a grouped workflow of steps.

This method initializes a scenario context manager that acts as a "header" run. It allows defining multiple steps (runs) that are automatically linked to this header run via parent_run_id, without manual threading.

The scenario run is started, then immediately suspended (allowing steps to run), and finally restored and completed when the context exits.

Parameters:

Name Type Description Default
name str

The name of the scenario. This will become the Run ID.

required
config Optional[Dict[str, Any]]

Scenario-level configuration. Stored on the header run but NOT automatically inherited by steps.

None
tags Optional[List[str]]

Tags for the scenario. "scenario_header" is automatically appended.

None
model str

The model name for the header run.

"scenario"
step_cache_hydration Optional[str]

Default cache hydration policy for all scenario steps unless overridden in a specific scenario.trace(...) or scenario.run(...).

None
name_template Optional[str]

Optional step name template applied when scenario.run() is called without an explicit name and no step-level template is provided.

None
cache_epoch Optional[int]

Scenario-level cache epoch override for all steps in this scenario.

None
coupler Optional[Coupler]

Optional Coupler instance to use for the scenario.

None
require_outputs Optional[Iterable[str]]

Declare required outputs at scenario creation time.

None
**kwargs Any

Additional metadata or arguments for the header run (including facet_from).

{}

Returns:

Type Description
ScenarioContext

A context manager object that provides .trace() and .add_input() methods.

Example
with tracker.scenario("baseline", config={"mode": "test"}) as sc:
    sc.add_input("data.csv", key="data")
    with sc.step("init"):
        ...

end_run(status='completed', error=None)

End the current run started with begin_run().

This method finalizes the run, persists the final state to JSON and database, and emits lifecycle hooks. It is idempotent - calling it multiple times on an already-ended run will log a warning but not raise an error.

Parameters:

Name Type Description Default
status str

The final status of the run. Typically "completed" or "failed".

"completed"
error Optional[Exception]

The exception that caused the failure, if status is "failed". The error message will be stored in the run's metadata.

None

Returns:

Type Description
Run

The completed Run object.

Raises:

Type Description
RuntimeError

If there is no active run to end.

Example
run = tracker.begin_run("run_001", "urbansim")
try:
    # ... do work ...
    tracker.end_run("completed")
except Exception as e:
    tracker.end_run("failed", error=e)
    raise

define_step(**kwargs)

Attach metadata to a function without changing execution behavior.

This decorator lets you attach defaults such as outputs, tags, or cache_mode to a function. Tracker.run and ScenarioContext.run read this metadata when executing the function.

Parameters:

Name Type Description Default
**kwargs Any

Step metadata (e.g., outputs, tags, cache_mode, inject_context) to attach to the function.

{}

Returns:

Type Description
Callable

A decorator that returns the original function with attached metadata.

cached_artifacts(direction='output')

Returns hydrated artifacts for the active run when it is a cache hit.

Parameters:

Name Type Description Default
direction str

"output" or "input" to filter hydrated artifacts.

"output"

Returns:

Type Description
Dict[str, Artifact]

Mapping of artifact key to Artifact for the specified direction. Returns an empty dict if no cache hit or no artifacts.

cached_output(key=None)

Convenience to fetch a hydrated cached output artifact for the current run.

Parameters:

Name Type Description Default
key Optional[str]

If provided, returns the artifact with this key; otherwise returns the first available cached output.

None

Returns:

Type Description
Optional[Artifact]

The cached output artifact, or None if not cached / not found.

suspend_cache_options()

Suspend active-run cache options and reset them to defaults.

This is useful for helper functions that want default cache behavior without mutating the caller's options.

Returns:

Type Description
ActiveRunCacheOptions

The previously active cache options, for later restoration.

restore_cache_options(options)

Restore previously suspended active-run cache options.

This should typically be paired with a prior suspend_cache_options call to restore the caller's cache behavior.

Parameters:

Name Type Description Default
options ActiveRunCacheOptions

Cache options to restore (usually returned by suspend_cache_options).

required

capture_outputs(directory, pattern='*', recursive=False)

A context manager to automatically capture and log new or modified files in a directory.

This context manager is used within a tracker.run/tracker.trace call or start_run block to monitor a specified directory. Any files created or modified within this directory during the execution of the with block will be automatically logged as output artifacts of the current run.

Parameters:

Name Type Description Default
directory Union[str, Path]

The path to the directory to monitor for new or modified files.

required
pattern str

A glob pattern (e.g., ".csv", "data_.parquet") to filter which files are captured within the specified directory. Defaults to all files.

"*"
recursive bool

If True, the capture will recursively scan subdirectories within directory.

False

Yields:

Type Description
OutputCapture

An OutputCapture object containing a list of Artifact objects that were captured and logged after the with block finishes.

Raises:

Type Description
RuntimeError

If capture_outputs is used outside of an active start_run context.

log_meta(**kwargs)

Updates the metadata for the current run.

This method allows logging additional key-value pairs to the meta field of the currently active Run object. This is particularly useful for recording runtime metrics (e.g., accuracy, loss, F1-score), tags, or any other arbitrary information generated during the run's execution. The metadata is immediately flushed to both the JSON log and the database.

Parameters:

Name Type Description Default
**kwargs Any

Arbitrary key-value pairs to merge into the meta dictionary of the current run. Existing keys will be updated, and new keys will be added.

{}

log_artifact(path, key=None, direction='output', schema=None, driver=None, table_path=None, array_path=None, content_hash=None, force_hash_override=False, validate_content_hash=False, reuse_if_unchanged=False, reuse_scope='same_uri', profile_file_schema=None, file_schema_sample_rows=None, facet=None, facet_schema_version=None, facet_index=False, **meta)

Logs an artifact (file or data reference) within the current run context.

This method supports:

  • Automatic Input Discovery: If an input path matches a previously logged output artifact, Consist automatically links them, building the provenance graph. This is a key part of "Auto-Forking".

  • Path Virtualization: Converts absolute file system paths to portable URIs (e.g., inputs://data.csv) using configured mounts, adhering to "Path Resolution & Mounts".

  • Schema Metadata Injection: Embeds schema information (if provided) into the artifact's metadata, useful for later "Strict Mode" validation or introspection.

  • Immediate Persistence: This single-artifact method flushes JSON state and syncs artifact links to the database immediately for this call.

Parameters:

Name Type Description Default
path ArtifactRef

A file path (str/Path) or an existing Artifact reference to be logged. Passing an Artifact is useful for explicitly linking an already-logged artifact as an input or output in the current run.

required
key Optional[str]

A semantic, human-readable name for the artifact (e.g., "households"). Required if path is a path-like (str/Path).

None
direction str

Specifies whether the artifact is an "input" or "output" for the current run. Defaults to "output".

"output"
schema Optional[Type[SQLModel]]

An optional SQLModel class that defines the expected schema for the artifact's data. Its name will be stored in artifact metadata.

None
driver Optional[str]

Explicitly specify the driver (e.g., 'h5_table'). If None, the driver is inferred from the file extension.

None
table_path Optional[str]

Optional table path inside a container (e.g., HDF5).

None
array_path Optional[str]

Optional array path inside a container (e.g., Zarr group).

None
content_hash Optional[str]

Precomputed content hash to use for the artifact instead of hashing the path on disk.

None
force_hash_override bool

If True, overwrite an existing artifact hash when it differs from content_hash. By default, mismatched overrides are ignored with a warning.

False
validate_content_hash bool

If True, verify content_hash against the on-disk data and raise on mismatch.

False
reuse_if_unchanged bool

Deprecated for outputs. Consist now always creates a fresh output artifact row; identical bytes are deduplicated via artifact.content_id. Setting this on outputs emits a warning and does not reuse prior rows. Input-side behavior is unaffected.

False
reuse_scope (same_uri, any_uri)

Deprecated for outputs. any_uri is ignored for outputs; deduplication is governed by content_id. Input-side behavior is unaffected.

"same_uri"
profile_file_schema bool

If True, profile a lightweight schema for file-based tabular artifacts. Use "if_changed" to skip profiling when matching content identity already has a stored schema (prefers content_id; falls back to hash for legacy rows).

False
file_schema_sample_rows Optional[int]

Maximum rows to sample when profiling file-based schemas.

None
facet Optional[FacetLike]

Optional artifact-level facet payload (dict or Pydantic model).

None
facet_schema_version Optional[Union[str, int]]

Optional schema version for artifact facet compatibility.

None
facet_index bool

If True, flatten scalar facet fields into artifact_kv for fast queries.

False
**meta Any

Additional key-value pairs to store in the artifact's flexible meta field.

{}

Returns:

Type Description
Artifact

The created or updated Artifact object.

Raises:

Type Description
RuntimeError

If called outside an active run context.

ValueError

If key is not provided when path is a path-like (str/Path).

log_artifacts(outputs, direction='output', driver=None, metadata_by_key=None, facets_by_key=None, facet_schema_versions_by_key=None, facet_index=False, reuse_if_unchanged=False, reuse_scope='same_uri', **shared_meta)

Log multiple artifacts in a single call for efficiency.

This is a convenience method for bulk artifact logging, particularly useful when a model produces many output files or when registering multiple inputs. This requires an explicit mapping so artifact keys are always deliberate. For efficiency, persistence is batched: JSON flush and DB artifact sync occur once at the end of the call (not once per artifact).

Parameters:

Name Type Description Default
outputs mapping

Mapping of key -> path/Artifact to log.

required
direction str

Specifies whether the artifacts are "input" or "output" for the current run.

"output"
driver Optional[str]

Explicitly specify the driver for all artifacts. If None, driver is inferred from each file's extension individually.

None
metadata_by_key Optional[Mapping[str, Dict[str, Any]]]

Per-key metadata overrides applied on top of shared metadata.

None
facets_by_key Optional[Mapping[str, FacetLike]]

Per-key artifact facet payloads.

None
facet_schema_versions_by_key Optional[Mapping[str, Union[str, int]]]

Optional per-key schema versions for artifact facet payloads.

None
facet_index bool

Whether to index scalar artifact facet values in artifact_kv.

False
reuse_if_unchanged bool

Deprecated for outputs. Batch output logging still creates a fresh artifact row per call; identical bytes are deduplicated via artifact.content_id. Setting this on outputs emits a warning and does not reuse prior rows. Input-side behavior is unaffected.

False
reuse_scope (same_uri, any_uri)

Deprecated for outputs. any_uri is ignored for outputs; deduplication is governed by content_id instead. Input-side behavior is unaffected.

"same_uri"
**shared_meta Any

Metadata key-value pairs to apply to ALL logged artifacts. Useful for tagging a batch of related files.

{}

Returns:

Type Description
Dict[str, Artifact]

Mapping of key -> logged Artifact.

Raises:

Type Description
RuntimeError

If called outside an active run context.

ValueError

If metadata_by_key contains keys not present in outputs.

TypeError

If mapping keys are not strings.

Example
# Log explicit outputs
outputs = tracker.log_artifacts(
    {"persons": "output/persons.parquet", "households": "output/households.parquet"},
    metadata_by_key={"households": {"role": "primary"}},
    year=2030,
)

log_input(path, key=None, content_hash=None, force_hash_override=False, validate_content_hash=False, facet=None, facet_schema_version=None, facet_index=False, **meta)

Log an input artifact. Convenience wrapper for log_artifact(direction='input').

Parameters:

Name Type Description Default
path ArtifactRef

A file path (str/Path) or an existing Artifact reference to be logged.

required
key Optional[str]

A semantic, human-readable name for the artifact.

None
content_hash Optional[str]

Precomputed content hash to use for the artifact instead of hashing the path on disk.

None
force_hash_override bool

If True, overwrite an existing artifact hash when it differs from content_hash. By default, mismatched overrides are ignored with a warning.

False
validate_content_hash bool

If True, verify content_hash against the on-disk data and raise on mismatch.

False
facet Optional[FacetLike]

Optional artifact-level facet payload for this input artifact.

None
facet_schema_version Optional[Union[str, int]]

Optional facet schema version.

None
facet_index bool

Whether to index scalar facet fields for querying.

False
**meta Any

Additional key-value pairs to store in the artifact's meta field.

{}

Returns:

Type Description
Artifact

The created or updated Artifact object.

log_output(path, key=None, content_hash=None, force_hash_override=False, validate_content_hash=False, reuse_if_unchanged=False, reuse_scope='same_uri', facet=None, facet_schema_version=None, facet_index=False, **meta)

Log an output artifact. Convenience wrapper for log_artifact(direction='output').

Parameters:

Name Type Description Default
path ArtifactRef

A file path (str/Path) or an existing Artifact reference to be logged.

required
key Optional[str]

A semantic, human-readable name for the artifact.

None
content_hash Optional[str]

Precomputed content hash to use for the artifact instead of hashing the path on disk.

None
force_hash_override bool

If True, overwrite an existing artifact hash when it differs from content_hash. By default, mismatched overrides are ignored with a warning.

False
validate_content_hash bool

If True, verify content_hash against the on-disk data and raise on mismatch.

False
reuse_if_unchanged bool

Deprecated for outputs. A fresh output artifact row is always created; identical bytes share content_id. Setting this emits a warning and does not reuse prior rows.

False
reuse_scope (same_uri, any_uri)

Deprecated for outputs. any_uri is ignored; deduplication is by content_id.

"same_uri"
facet Optional[FacetLike]

Optional artifact-level facet payload for this output artifact.

None
facet_schema_version Optional[Union[str, int]]

Optional facet schema version.

None
facet_index bool

Whether to index scalar facet fields for querying.

False
**meta Any

Additional key-value pairs to store in the artifact's meta field.

{}

Returns:

Type Description
Artifact

The created or updated Artifact object.

log_dataframe(df, key, schema=None, direction='output', path=None, driver=None, meta=None, profile_file_schema=False, file_schema_sample_rows=1000, **to_file_kwargs)

Serialize a DataFrame, log it as an artifact, and trigger optional ingestion.

Parameters:

Name Type Description Default
df DataFrame

Data to persist.

required
key str

Logical artifact key.

required
schema Optional[Type[SQLModel]]

Schema used for ingestion, if provided.

None
direction str

Artifact direction relative to the run.

"output"
path Optional[Union[str, Path]]

Output path; defaults to <run_dir>/outputs/<run_subdir>/<key>.<driver> where run_subdir is derived from run_subdir_fn (or the default pattern).

None
driver Optional[str]

File format driver (e.g., "parquet" or "csv").

None
meta Optional[Dict[str, Any]]

Additional metadata for the artifact.

None
profile_file_schema bool

If True, profile a lightweight schema for file-based tabular artifacts.

False
file_schema_sample_rows Optional[int]

Maximum rows to sample when profiling file-based schemas.

1000
**to_file_kwargs Any

Keyword arguments forwarded to pd.DataFrame.to_parquet or to_csv.

{}

Returns:

Type Description
Artifact

The artifact logged for the written dataset.

Raises:

Type Description
ValueError

If the requested driver is unsupported.

load(artifact, **kwargs)

Load an artifact using the public API while binding this tracker context.

This is equivalent to consist.load(artifact, tracker=self, ...) and uses the artifact driver to select the appropriate loader.

Parameters:

Name Type Description Default
artifact Artifact

The artifact to load.

required
**kwargs Any

Loader-specific options forwarded to consist.load.

{}

Returns:

Type Description
Any

The loaded data object (e.g., DuckDB Relation, xarray.Dataset, etc.).

materialize(artifact, destination_path, *, on_missing='warn')

Materialize a cached artifact onto the filesystem.

This copies bytes from the resolved artifact URI to destination_path. It does not perform database-backed reconstruction.

Returns:

Type Description
Optional[str]

The destination path for the materialized artifact, or None if missing and on_missing="warn".

ingest(artifact, data=None, schema=None, run=None, profile_schema=True)

Ingests data associated with an Artifact into the Consist DuckDB database.

This method is central to Consist's "Hot Data Strategy", where data is materialized into the database for faster query performance and easier sharing. It leverages the dlt (Data Load Tool) integration for efficient and robust data loading, including support for schema inference and evolution.

Parameters:

Name Type Description Default
artifact Artifact

The artifact object representing the data being ingested. If the artifact was logged with a schema (e.g., log_artifact(path, schema=MySchema)) and that schema was registered with the Tracker at initialization (e.g., Tracker(..., schemas=[MySchema])), it will be automatically looked up and used for ingestion.

required
data Optional[Union[Iterable[Dict[str, Any]], Any]]

An iterable (e.g., list of dicts, generator) where each item represents a row of data to be ingested. If data is omitted, Consist attempts to stream it directly from the artifact's file URI, resolving the path. Can also be other data types that dlt can handle directly (e.g., Pandas DataFrame).

None
schema Optional[Type[SQLModel]]

An optional SQLModel class that defines the expected schema for the ingested data. If provided, dlt will use this for strict validation and this parameter takes precedence over any auto-detected schema. If not provided, Consist will automatically look up the schema by name from schemas registered in Tracker.init (using artifact.meta["schema_name"]).

None
run Optional[Run]

If provided, tags data with this run's ID (Offline Mode). If None, uses the currently active run (Online Mode).

None
profile_schema bool

If True, profile and persist a deduped schema record for the ingested table, writing schema_id/schema_summary (and optionally schema_profile) into Artifact.meta.

True

Returns:

Type Description
Any

The result information from the dlt ingestion process.

Raises:

Type Description
RuntimeError

If no database is configured (db_path was not provided during Tracker initialization) or if ingest is called outside of an active run context.

Exception

Any exception raised by the underlying dlt ingestion process.

Examples:

Register a schema and associate it with a logged artifact:

tracker = Tracker(..., schemas=[MyDataSchema])
art = tracker.log_artifact(file.csv, schema=MyDataSchema)

# Automatically looks up and uses MyDataSchema for ingestion
tracker.ingest(art, data=df)

Schemas are persisted by name, allowing lookup across different Python sessions:

# Session 1:
tracker = Tracker(..., schemas=[MyDataSchema])
art = tracker.log_artifact(file.csv, schema=MyDataSchema)

# Session 2:
tracker2 = Tracker(..., schemas=[MyDataSchema])
art2 = tracker2.get_artifact("mydata")
# Looks up MyDataSchema by artifact's schema_name ("MyDataSchema")
tracker2.ingest(art2, data=df)

Explicitly override the default schema during ingestion:

tracker.ingest(art, data=df, schema=DifferentSchema)

find_runs(tags=None, year=None, iteration=None, stage=None, phase=None, model=None, status=None, parent_id=None, metadata=None, limit=100, index_by=None, name=None)

Retrieve runs matching the specified criteria.

Parameters:

Name Type Description Default
tags Optional[List[str]]

Filter runs that contain all provided tags.

None
year Optional[int]

Filter by run year.

None
iteration Optional[int]

Filter by run iteration.

None
stage Optional[str]

Filter by run stage.

None
phase Optional[str]

Filter by run phase.

None
model Optional[str]

Filter by run model name.

None
status Optional[str]

Filter by run status (e.g., "completed", "failed").

None
parent_id Optional[str]

Filter by scenario/header parent id.

None
metadata Optional[Dict[str, Any]]

Filter by exact matches in Run.meta (client-side filter).

None
limit int

Maximum number of runs to return.

100
index_by Optional[Union[str, IndexBySpec]]

If provided, returns a dict keyed by a run attribute or facet value. Supported forms: - "year" / "iteration" / any Run attribute name - "facet.<key>" or "facet:<key>" to key by a persisted facet value - IndexBySpec helpers like index_by_field(...) / index_by_facet(...)

Note: if multiple runs share the same key, the last one wins.

None
name Optional[str]

Filter by Run.model_name/name alias used by DatabaseManager.

None

Returns:

Type Description
Union[List[Run], Dict[Hashable, Run]]

List of runs, or a dict keyed by index_by when requested.

Raises:

Type Description
TypeError

If index_by is an unsupported type.

run_set(label=None, **filters)

Build a RunSet from find_runs filters.

Parameters:

Name Type Description Default
label Optional[str]

Optional label attached to the returned RunSet.

None
**filters Any

Filters forwarded to find_runs.

{}

Returns:

Type Description
RunSet

A tracker-backed RunSet for fluent grouping/alignment analysis.

Notes

This is equivalent to RunSet.from_query(self, label=label, **filters).

find_run(**kwargs)

Find exactly one run matching the criteria.

This is a convenience wrapper around find_runs(...) that enforces uniqueness.

Parameters:

Name Type Description Default
**kwargs Any

Filters forwarded to find_runs(...). Special cases: - id or run_id: if provided, performs a direct primary-key lookup.

{}

Returns:

Type Description
Run

The matching run.

Raises:

Type Description
ValueError

If no runs match, or more than one run matches.

find_latest_run(*, parent_id=None, model=None, status=None, year=None, stage=None, phase=None, tags=None, metadata=None, limit=10000)

Return the most recent run matching the filters.

Selection priority: 1) Highest iteration (when present) 2) Newest created_at (fallback when no iteration is set)

Parameters:

Name Type Description Default
parent_id Optional[str]

Filter by scenario/parent run ID.

None
model Optional[str]

Filter by model name.

None
status Optional[str]

Filter by run status.

None
year Optional[int]

Filter by run year.

None
stage Optional[str]

Filter by run stage.

None
phase Optional[str]

Filter by run phase.

None
tags Optional[List[str]]

Filter runs that contain all provided tags.

None
metadata Optional[Dict[str, Any]]

Filter by exact matches in Run.meta (client-side filter).

None
limit int

Maximum number of runs to consider.

10_000

get_latest_run_id(**kwargs)

Convenience wrapper to return the latest run ID for the given filters.

Parameters:

Name Type Description Default
**kwargs Any

Filters forwarded to find_latest_run.

{}

Returns:

Type Description
str

The run ID of the latest matching run.

Raises:

Type Description
ValueError

If no runs match the provided filters.

find_artifacts(*, creator=None, consumer=None, key=None, limit=100)

Find artifacts by producing/consuming runs and key.

Parameters:

Name Type Description Default
creator Optional[Union[str, Run]]

Run ID (or Run) that logged the artifact as an output.

None
consumer Optional[Union[str, Run]]

Run ID (or Run) that logged the artifact as an input.

None
key Optional[str]

Exact artifact key to match.

None
limit int

Maximum number of artifacts to return.

100

Returns:

Type Description
list

Matching artifact records (empty if DB is not configured).

get_artifact(key_or_id, *, run_id=None)

Retrieves an Artifact by semantic key or UUID, optionally scoped to run_id.

Parameters:

Name Type Description Default
key_or_id Union[str, UUID]

The artifact key (e.g., "households") or artifact UUID.

required
run_id Optional[str]

If provided, limits results to artifacts linked to this run (as either input or output) via run_artifact_link.

None

Returns:

Type Description
Optional[Artifact]

The found artifact, or None if not found.

get_artifacts_for_run(run_id)

Retrieve inputs and outputs for a specific run, organized by key.

Parameters:

Name Type Description Default
run_id str

Run identifier.

required

Returns:

Type Description
RunArtifacts

Container with inputs and outputs dicts. Returns empty collections if the database is not configured.

get_run(run_id)

Retrieve a single Run by its ID from the database.

Parameters:

Name Type Description Default
run_id str

The unique identifier of the run to retrieve.

required

Returns:

Type Description
Optional[Run]

The Run object if found, or None if missing or no database is configured.

get_run_config(run_id, *, allow_missing=False)

Load the full config snapshot for a historical run.

Parameters:

Name Type Description Default
run_id str

Run identifier.

required
allow_missing bool

Return None if the snapshot is missing instead of raising.

False

Returns:

Type Description
Optional[Dict[str, Any]]

The stored config payload, or None if missing and allow_missing.

get_run_inputs(run_id)

Return input artifacts for a run, keyed by artifact key.

Parameters:

Name Type Description Default
run_id str

Run identifier.

required

Returns:

Type Description
Dict[str, Artifact]

Input artifacts keyed by artifact key. Returns an empty dict if the database is not configured or the run is unknown.

get_run_outputs(run_id)

Return output artifacts for a run, keyed by artifact key.

Parameters:

Name Type Description Default
run_id str

Run identifier.

required

Returns:

Type Description
Dict[str, Artifact]

Output artifacts keyed by artifact key. Returns an empty dict if the database is not configured or the run is unknown.

get_config_bundle(run_id, *, adapter=None, role='bundle', allow_missing=False)

Resolve a config artifact path for a run by role.

This helper scans run-linked artifacts and selects those with artifact.meta["config_role"] == role. When adapter is provided, matching uses existing adapter identity conventions: run.meta["config_adapter"] and/or artifact metadata (artifact.meta["config_adapter"] or artifact.meta["adapter"]).

If multiple artifacts match, selection is deterministic: sort by (artifact.key, artifact.created_at, artifact.id) and return the first.

get_artifact_lineage(artifact_key_or_id, *, max_depth=None)

Recursively builds a lineage tree for a given artifact.

Parameters:

Name Type Description Default
artifact_key_or_id Union[str, UUID]

Artifact key or UUID.

required
max_depth Optional[int]

Maximum depth to traverse (0 returns only the artifact). Useful for large graphs or iterative workflows.

None

print_lineage(artifact_key_or_id, *, max_depth=None, show_run_ids=False)

Print a formatted lineage tree for an artifact.

Parameters:

Name Type Description Default
artifact_key_or_id Union[str, UUID]

Artifact key or UUID to print.

required
max_depth Optional[int]

Maximum depth to traverse (0 prints only the artifact).

None
show_run_ids bool

Include run IDs alongside artifact entries.

False

history(limit=10, tags=None)

Return recent runs as a Pandas DataFrame.

Parameters:

Name Type Description Default
limit int

Maximum number of runs to include.

10
tags Optional[List[str]]

If provided, filter runs to those containing any of the given tags.

None

Returns:

Type Description
DataFrame

A DataFrame of recent runs (empty if DB is not configured).

diff_runs(run_id_a, run_id_b, *, namespace=None, prefix=None, keys=None, limit=10000, include_equal=False)

Compare flattened config facets between two runs.

Parameters:

Name Type Description Default
run_id_a str

Baseline run identifier.

required
run_id_b str

Comparison run identifier.

required
namespace Optional[str]

Namespace for facets. Defaults to each run's model name.

None
prefix Optional[str]

Filter keys by prefix (e.g. "inputs.").

None
keys Optional[Iterable[str]]

Only include specific keys when provided.

None
limit int

Maximum number of entries to inspect per run.

10_000
include_equal bool

If True, include keys whose values are unchanged.

False

Returns:

Type Description
dict

A dict with namespace metadata and changes mapping keys to values.

get_config_facet(facet_id)

Retrieve a single persisted config facet by ID.

Parameters:

Name Type Description Default
facet_id str

The facet identifier.

required

Returns:

Type Description
Any

The facet record if present, otherwise None.

get_config_facets(*, namespace=None, schema_name=None, limit=100)

List persisted config facets, optionally filtered.

Parameters:

Name Type Description Default
namespace Optional[str]

Filter facets by namespace.

None
schema_name Optional[str]

Filter facets by schema name.

None
limit int

Maximum number of facet records to return.

100

Returns:

Type Description
list

A list of facet records (empty if DB is not configured).

get_run_config_kv(run_id, *, namespace=None, prefix=None, limit=10000)

Retrieve flattened key/value config entries for a run.

This is primarily used for querying and debugging indexed config facets.

Parameters:

Name Type Description Default
run_id str

Run identifier.

required
namespace Optional[str]

Filter by namespace.

None
prefix Optional[str]

Filter keys by prefix (e.g. "inputs.").

None
limit int

Maximum number of entries to return.

10_000

Returns:

Type Description
list

A list of key/value rows (empty if DB is not configured).

get_config_values(run_id, *, namespace=None, prefix=None, keys=None, limit=10000)

Return a flattened config facet as a dict of key/value pairs.

Parameters:

Name Type Description Default
run_id str

Run identifier.

required
namespace Optional[str]

Namespace for the facet. Defaults to the run's model name when available.

None
prefix Optional[str]

Filter keys by prefix (e.g. "inputs.").

None
keys Optional[Iterable[str]]

Only include specific keys when provided.

None
limit int

Maximum number of entries to return.

10_000

Returns:

Type Description
dict

Mapping of flattened keys to typed values.

Notes

Keys are stored as flattened dotted paths. If an original key contains a literal dot, it is escaped as "\." in the stored key.

get_config_value(run_id, key, *, namespace=None, default=None)

Retrieve a single config value from a flattened config facet.

Parameters:

Name Type Description Default
run_id str

Run identifier.

required
key str

Flattened key to fetch.

required
namespace Optional[str]

Namespace for the facet. Defaults to the run's model name when available.

None
default Any

Value to return when the key is missing.

None

Returns:

Type Description
Any

The typed value for the key, or default if missing.

get_registered_schema(schema_name, default=None)

Resolve a registered SQLModel schema by its class name.

This is an ergonomic lookup helper for workflows that persist or exchange schema names (for example artifact.meta["schema_name"]) and then need the corresponding SQLModel class at runtime.

Parameters:

Name Type Description Default
schema_name str

Registered schema class name to resolve. Matching is exact and case-sensitive.

required
default Optional[Type[SQLModel]]

Value returned when schema_name is not found in the registry. Defaults to None.

None

Returns:

Type Description
Optional[Type[SQLModel]]

The registered SQLModel class when found, otherwise default.

Raises:

Type Description
TypeError

If schema_name is not a string.

ValueError

If schema_name is an empty or whitespace-only string.

Examples:

tracker = Tracker(..., schemas=[MySchema])
schema_cls = tracker.get_registered_schema("MySchema")
missing = tracker.get_registered_schema("UnknownSchema")

find_runs_by_facet_kv(*, namespace, key, value_type=None, value_str=None, value_num=None, value_bool=None, limit=100)

Find runs by a flattened config facet key/value.

Parameters:

Name Type Description Default
namespace str

Facet namespace.

required
key str

Flattened facet key.

required
value_type Optional[str]

Optional discriminator for the value column (implementation dependent).

None
value_str Optional[str]

String value to match.

None
value_num Optional[float]

Numeric value to match.

None
value_bool Optional[bool]

Boolean value to match.

None
limit int

Maximum number of runs to return.

100

Returns:

Type Description
list

Matching run records (empty if DB is not configured).

view(model, key=None)

Create/register a hybrid view for a given SQLModel schema.

Parameters:

Name Type Description Default
model Type[SQLModel]

SQLModel schema defining the logical columns for the concept.

required
key Optional[str]

Override the concept key (defaults to model.__tablename__).

None

Returns:

Type Description
Type[SQLModel]

The dynamic SQLModel view class exposed via tracker.views.

Raises:

Type Description
RuntimeError

If the tracker has no database configured.

create_view(view_name, concept_key)

Create a named hybrid view over a registered concept.

This is a lower-level helper than Tracker.view(...). It is useful when you want to create multiple named views over the same concept key, or when you want explicit control over the view name.

Parameters:

Name Type Description Default
view_name str

The SQL view name to create in the database (e.g., "v_persons").

required
concept_key str

The registered concept key to materialize (typically a table/artifact key).

required

Returns:

Type Description
Any

Backend-specific result from ViewFactory.create_hybrid_view.

create_grouped_view(view_name, *, schema_id=None, schema=None, namespace=None, params=None, drivers=None, attach_facets=None, include_system_columns=True, mode='hybrid', if_exists='replace', missing_files='warn', run_id=None, parent_run_id=None, model=None, status=None, year=None, iteration=None, schema_compatible=False)

Create one analysis view across many artifacts selected by schema/facets.

Unlike create_view(view_name, concept_key), which targets one key, this method selects artifacts by schema_id plus optional facet/run filters and materializes a single view over hot and/or cold data.

Parameters:

Name Type Description Default
view_name str

Name of the SQL view to create.

required
schema_id Optional[str]

Schema identity used as the primary artifact selector.

None
schema Optional[Type[SQLModel]]

SQLModel class selector convenience. When provided, Consist resolves matching stored schema ids from this model definition, first by exact field names and then by compatible subset/superset field-name matching.

None
namespace Optional[str]

Default ArtifactKV namespace applied to facet predicates that do not include an explicit namespace.

None
params Optional[Iterable[str]]

Facet predicate expressions, each in one of: <key>=<value>, <key>>=<value>, <key><=<value>. A leading namespace is supported, for example beam.phys_sim_iteration=2.

None
drivers Optional[List[str]]

Optional artifact-driver filter, e.g. ["parquet"].

None
attach_facets Optional[List[str]]

Facet key paths to project into the view as typed facet_<key> columns.

None
include_system_columns bool

Whether to include Consist system columns in the view.

True
mode (hybrid, hot_only, cold_only)

Which storage tier(s) to include in the view.

"hybrid"
if_exists (replace, error)

Behavior when view_name already exists.

"replace"
missing_files (warn, error, skip_silent)

Behavior when a selected cold file is missing.

"warn"
run_id Optional[str]

Optional exact run-id filter.

None
parent_run_id Optional[str]

Optional parent/scenario run-id filter.

None
model Optional[str]

Optional run model-name filter.

None
status Optional[str]

Optional run status filter.

None
year Optional[int]

Optional run year filter.

None
iteration Optional[int]

Optional run iteration filter.

None
schema_compatible bool

If True, allow schema-compatible subset/superset variants by field names in addition to exact schema_id matches.

False

Returns:

Type Description
Any

Backend-specific result from ViewFactory.create_grouped_hybrid_view.

Raises:

Type Description
RuntimeError

If no database is configured.

ValueError

If selector or facet predicates are invalid, or view policies are invalid.

Examples:

tracker.create_grouped_view(
    "v_linkstats_all",
    schema_id="abc123...",
    namespace="beam",
    params=["artifact_family=linkstats", "year=2018"],
    attach_facets=["artifact_family", "phys_sim_iteration"],
    drivers=["parquet"],
    mode="hybrid",
)

load_matrix(concept_key, variables=None, *, run_ids=None, parent_id=None, model=None, status=None)

Convenience wrapper for loading a matrix view from tracked artifacts.

Parameters:

Name Type Description Default
concept_key str

Semantic key for the matrix artifacts.

required
variables Optional[List[str]]

Variables to load from each Zarr store; defaults to all variables.

None
run_ids Optional[List[str]]

Restrict to specific run IDs.

None
parent_id Optional[str]

Filter by scenario/parent run ID.

None
model Optional[str]

Filter by model name.

None
status Optional[str]

Filter by run status.

None

Returns:

Type Description
Any

An xarray.Dataset containing the combined matrix data.

export_schema_sqlmodel(*, schema_id=None, artifact_id=None, out_path=None, table_name=None, class_name=None, abstract=True, include_system_cols=False, include_stats_comments=True, prefer_source=None)

Export a captured artifact schema as a SQLModel stub for manual editing.

Exactly one of schema_id or artifact_id must be provided. The generated Python source is returned and can optionally be written to out_path.

Parameters:

Name Type Description Default
schema_id Optional[str]

Schema identifier to export (from the schema registry). If provided, prefer_source is ignored and this specific schema is used.

None
artifact_id Optional[Union[str, UUID]]

Artifact ID to export the associated schema. When used, the schema selection respects the prefer_source parameter.

None
out_path Optional[Path]

If provided, write the stub to this path and return its contents.

None
table_name Optional[str]

Override the SQL table name in the generated class.

None
class_name Optional[str]

Override the Python class name in the generated class.

None
abstract bool

Whether to mark the generated class as abstract.

True
include_system_cols bool

Whether to include Consist system columns in the stub.

False
include_stats_comments bool

Whether to include column-level stats as comments.

True
prefer_source (file, duckdb, user_provided)

Preference hint for when user_provided schema does not exist. This is useful when an artifact has both a file profile (pandas dtypes) and a duckdb profile (post-ingestion types). Ignored if schema_id is provided directly.

IMPORTANT: User-provided schemas (manually curated with FK constraints, indexes, etc.) are ALWAYS preferred if they exist. This parameter does not override user_provided schemas.

  • "file": Prefer the original file schema (CSV/Parquet with pandas dtypes)
  • "duckdb": Prefer the post-ingestion schema from the DuckDB table
  • "user_provided": Prefer manually curated schema observations explicitly
  • None (default): Prefer file, as it preserves richer type information (e.g., pandas category)
"file"

Returns:

Type Description
str

The rendered SQLModel stub source.

Raises:

Type Description
ValueError

If the tracker has no database configured or if the selector is invalid.

KeyError

If no schema is found for the provided selector.

Examples:

Export file schema (original raw file dtypes):

tracker.export_schema_sqlmodel(artifact_id=art.id)

Export ingested table schema (after dlt normalization):

tracker.export_schema_sqlmodel(artifact_id=art.id, prefer_source="duckdb")

Export a specific schema directly by ID:

tracker.export_schema_sqlmodel(schema_id="abc123xyz")

netcdf_metadata(concept_key)

Access NetCDF metadata views for a given artifact key.

This provides convenient access to query and explore NetCDF file structures stored in Consist's metadata catalog.

Parameters:

Name Type Description Default
concept_key str

The semantic key identifying the NetCDF artifact.

required

Returns:

Type Description
NetCdfMetadataView

A view object with methods to explore variables, dimensions, and attributes.

Example
view = tracker.netcdf_metadata("climate")
variables = view.get_variables(year=2024)
print(view.summary("climate"))

openmatrix_metadata(concept_key)

Access OpenMatrix metadata views for a given artifact key.

This provides convenient access to query and explore OpenMatrix file structures stored in Consist's metadata catalog.

Parameters:

Name Type Description Default
concept_key str

The semantic key identifying the OpenMatrix artifact.

required

Returns:

Type Description
OpenMatrixMetadataView

A view object with methods to explore matrices, zones, and attributes.

Example
view = tracker.openmatrix_metadata("demand")
matrices = view.get_matrices(year=2024)
zones = view.get_zone_counts()
print(view.summary("demand"))

spatial_metadata(concept_key)

Access spatial metadata views for a given artifact key.

Parameters:

Name Type Description Default
concept_key str

The semantic key identifying the spatial artifact.

required

Returns:

Type Description
SpatialMetadataView

A view object with methods to explore spatial metadata.

Example
view = tracker.spatial_metadata("parcels")
bounds = view.get_bounds("parcels")

canonicalize_config(adapter, config_dirs, *, run=None, run_id=None, strict=False, ingest=True, profile_schema=False, options=None)

Canonicalize a model-specific config directory and ingest queryable slices.

Parameters:

Name Type Description Default
adapter ConfigAdapter

Adapter implementation for the model (e.g., ActivitySim).

required
config_dirs Iterable[Union[str, Path]]

Ordered config directories to canonicalize.

required
run Optional[Run]

Run context to attach to; defaults to the active run.

None
run_id Optional[str]

Run identifier; must match the active run when provided.

None
strict bool

If True, adapter should error on missing references.

False
ingest bool

Whether to ingest any queryable tables produced by the adapter.

True
profile_schema bool

Whether to profile ingested schemas.

False
options Optional[ConfigAdapterOptions]

Shared adapter options that override strict/ingest defaults.

None

Returns:

Type Description
ConfigContribution

Structured summary of logged artifacts and ingestables.

prepare_config(adapter, config_dirs, *, strict=False, options=None, validate_only=False, facet_spec=None, facet_schema_name=None, facet_schema_version=None, facet_index=None)

Prepare a config plan without logging artifacts or ingesting data.

Parameters:

Name Type Description Default
adapter ConfigAdapter

Adapter implementation for the model (e.g., ActivitySim).

required
config_dirs Iterable[Union[str, Path]]

Ordered config directories to canonicalize.

required
strict bool

If True, adapter should error on missing references.

False
options Optional[ConfigAdapterOptions]

Shared adapter options that override strict defaults.

None
validate_only bool

If True, validate ingestables without logging or ingesting.

False
facet_spec Optional[Dict[str, Any]]

Adapter-specific facet extraction spec.

None
facet_schema_name Optional[str]

Optional facet schema name for persistence.

None
facet_schema_version Optional[Union[str, int]]

Optional facet schema version for persistence.

None
facet_index Optional[bool]

Optional flag controlling KV facet indexing.

None

Returns:

Type Description
ConfigPlan

Pre-run config plan containing artifacts and ingestables.

apply_config_plan(plan, *, run=None, ingest=True, profile_schema=False, adapter=None, options=None)

Apply a pre-run config plan to the active run.

Parameters:

Name Type Description Default
plan ConfigPlan

Plan produced by prepare_config.

required
run Optional[Run]

Run context to attach to; defaults to the active run.

None
ingest bool

Whether to ingest any queryable tables produced by the adapter.

True
profile_schema bool

Whether to profile ingested schemas.

False
adapter Optional[ConfigAdapter]

Adapter instance used to create run-scoped artifacts, if needed.

None
options Optional[ConfigAdapterOptions]

Shared adapter options that override ingest defaults.

None

Returns:

Type Description
ConfigContribution

Structured summary of logged artifacts and ingestables.

identity_from_config_plan(plan)

Return the identity hash derived from a config plan.

Parameters:

Name Type Description Default
plan ConfigPlan

Config plan produced by prepare_config.

required

Returns:

Type Description
str

Stable hash representing the canonical config content.

log_h5_container(path, key=None, direction='output', discover_tables=True, table_filter=None, hash_tables='if_unchanged', table_hash_chunk_rows=None, **meta)

Log an HDF5 file and optionally discover its internal tables.

This method provides first-class HDF5 container support, automatically discovering and logging internal tables as child artifacts. This is particularly useful for model pipelines that use HDF5 files containing multiple datasets or tables.

Parameters:

Name Type Description Default
path Union[str, Path]

Path to the HDF5 file.

required
key Optional[str]

Semantic name for the container. If not provided, uses the file stem.

None
direction str

Whether this is an "input" or "output" artifact.

"output"
discover_tables bool

If True, scan the file and create child artifacts for each table/dataset.

True
table_filter Optional[Union[Callable[[str], bool], List[str]]]

Filter which tables to log. Can be: - A callable that takes a table name and returns True to include - A list of table names to include (exact match) If None, all tables are included.

None
hash_tables Literal['always', 'if_unchanged', 'never']

Whether to compute content hashes for discovered tables. "if_unchanged" skips hashing when a table appears unchanged based on lightweight checks.

"if_unchanged"
table_hash_chunk_rows Optional[int]

Row chunk size to use when hashing large tables.

None
**meta Any

Additional metadata for the container artifact.

{}

Returns:

Type Description
Tuple[Artifact, List[Artifact]]

A tuple of (container_artifact, list_of_table_artifacts).

Raises:

Type Description
RuntimeError

If called outside an active run context.

ImportError

If h5py is not installed and discover_tables is True.

Example
# Log HDF5 file with auto-discovery of all tables
container, tables = tracker.log_h5_container("data.h5", key="urbansim_data")
print(f"Logged {len(tables)} tables from container")

# Filter tables by callable
container, tables = tracker.log_h5_container(
    "data.h5",
    key="urbansim_data",
    table_filter=lambda name: name.startswith("/2025/")
)

# Filter tables by list of names
container, tables = tracker.log_h5_container(
    "data.h5",
    key="urbansim_data",
    table_filter=["households", "persons", "buildings"]
)

log_h5_table(path, *, table_path, key=None, direction='output', parent=None, hash_table=True, table_hash_chunk_rows=None, profile_file_schema=False, file_schema_sample_rows=None, **meta)

Log a single HDF5 table as an artifact without scanning the container.

Parameters:

Name Type Description Default
path Union[str, Path]

Path to the HDF5 file on disk.

required
table_path str

Internal table/dataset path inside the HDF5 container.

required
key Optional[str]

Semantic key for the table artifact. Defaults to the dataset name.

None
direction str

Whether the table is an "input" or "output".

"output"
parent Optional[Artifact]

Optional parent container artifact to link this table to.

None
hash_table bool

Whether to compute a content hash for the table.

True
table_hash_chunk_rows Optional[int]

Chunk size for hashing large tables.

None
profile_file_schema bool | Literal['if_changed']

Whether to profile table schema and store it as metadata. Use "if_changed" to skip profiling when matching content identity already has a schema (prefers content_id and falls back to hash for legacy rows).

False
file_schema_sample_rows Optional[int]

Number of rows to sample when profiling schema.

None
**meta Any

Additional metadata to store on the artifact.

{}

Returns:

Type Description
Artifact

The created table artifact.

log_netcdf_file(path, key=None, direction='output', **meta)

Log a NetCDF file as an artifact with metadata extraction.

This method provides convenient logging for NetCDF files, automatically detecting the driver and storing structural metadata about variables, dimensions, and coordinates.

Parameters:

Name Type Description Default
path Union[str, Path]

Path to the NetCDF file.

required
key Optional[str]

Semantic name for the artifact. If not provided, uses the file stem.

None
direction str

Whether this is an "input" or "output" artifact.

"output"
**meta Any

Additional metadata for the artifact.

{}

Returns:

Type Description
Artifact

The logged artifact with metadata extracted from the NetCDF structure.

Raises:

Type Description
RuntimeError

If called outside an active run context.

ImportError

If xarray is not installed.

Example
# Log NetCDF file
art = tracker.log_netcdf_file("climate_data.nc", key="temperature")
# Optionally ingest metadata
tracker.ingest(art)

log_openmatrix_file(path, key=None, direction='output', **meta)

Log an OpenMatrix (OMX) file as an artifact with metadata extraction.

This method provides convenient logging for OpenMatrix files, automatically detecting the driver and storing structural metadata about matrices, dimensions, and attributes.

Parameters:

Name Type Description Default
path Union[str, Path]

Path to the OpenMatrix file.

required
key Optional[str]

Semantic name for the artifact. If not provided, uses the file stem.

None
direction str

Whether this is an "input" or "output" artifact.

"output"
**meta Any

Additional metadata for the artifact.

{}

Returns:

Type Description
Artifact

The logged artifact with metadata extracted from the OpenMatrix structure.

Raises:

Type Description
RuntimeError

If called outside an active run context.

ImportError

If neither h5py nor openmatrix is installed.

Example
# Log OpenMatrix file (e.g., ActivitySim travel demand)
art = tracker.log_openmatrix_file("demand.omx", key="travel_demand")
# Optionally ingest metadata
tracker.ingest(art)

set_run_subdir_fn(fn)

Set a callable that returns the per-run artifact subdirectory name.

Parameters:

Name Type Description Default
fn Optional[Callable[[Run], str]]

Callable that accepts a Run and returns a relative directory name. Set to None to disable the custom resolver.

required

run_artifact_dir(run=None)

Resolve the run-specific artifact directory for the active run.

Parameters:

Name Type Description Default
run Optional[Run]

Run to resolve the directory for. Defaults to the current run if active.

None

Returns:

Type Description
Path

Directory under run_dir where run artifacts should be written by default. Absolute artifact_dir values outside run_dir are only allowed when allow_external_paths is enabled.

resolve_uri(uri)

** Delegates to FileSystemManager. **

Converts a portable Consist URI back into an absolute file system path.

This is the inverse operation of _virtualize_path, crucial for "Path Resolution & Mounts". It uses the configured mounts and the run_dir to reconstruct the local absolute path to an artifact, making runs portable across different environments.

Parameters:

Name Type Description Default
uri str

The portable URI (e.g., "inputs://file.csv", "./output/data.parquet") to resolve.

required

Returns:

Type Description
str

The absolute file system path corresponding to the given URI. If the URI cannot be fully resolved (e.g., scheme not mounted), it returns the most resolved path or the original URI after attempting to make it absolute. Mounted URIs are validated to prevent path traversal outside the mount root.

run_query(query)

Execute a SQLModel/SQLAlchemy query via the metadata store.

Parameters:

Name Type Description Default
query Executable

Query object (select, text, etc.).

required

Returns:

Type Description
list

Results of the executed query.

Raises:

Type Description
RuntimeError

If no database is configured for this tracker.

get_run_record(run_id, *, allow_missing=False)

Load the full run record snapshot from disk.

This reads the JSON snapshot produced at run time (consist_runs/<id>.json) and returns the parsed ConsistRecord.

Parameters:

Name Type Description Default
run_id str

Run identifier.

required
allow_missing bool

Return None if the snapshot file is missing or unreadable instead of raising.

False

Returns:

Type Description
Optional[ConsistRecord]

The parsed run record, or None if missing and allow_missing.

resolve_historical_path(artifact, run)

Resolve the on-disk path for an artifact from a prior run.

Parameters:

Name Type Description Default
artifact Artifact

The artifact whose historical location should be resolved.

required
run Run

The run that originally produced/consumed the artifact.

required

Returns:

Type Description
Path

The resolved filesystem path for the artifact in its original run workspace.

load_input_bundle(run_id)

Load a set of input artifacts from a prior "bundle" run by run_id.

This is a convenience helper for shared DuckDB bundles where a dedicated run logs all required inputs as outputs. The returned dict can be passed directly to inputs=[...] on a new run.

Parameters:

Name Type Description Default
run_id str

The run id that logged the bundle outputs.

required

Returns:

Type Description
dict[str, Artifact]

Mapping of artifact key -> Artifact from the bundle run.

Raises:

Type Description
ValueError

If the run does not exist or has no output artifacts.

get_artifact_by_uri(uri, *, table_path=None, array_path=None)

Find an artifact by its URI.

Useful for checking if a specific file has been logged, or for retrieving artifact metadata by path.

Parameters:

Name Type Description Default
uri str

The portable URI to search for (e.g., "inputs://households.csv").

required
table_path Optional[str]

Optional table path to match.

None
array_path Optional[str]

Optional array path to match.

None

Returns:

Type Description
Optional[Artifact]

The found Artifact object, or None if no matching artifact is found.

get_run_artifact(run_id, key=None, key_contains=None, direction='output')

Convenience helper to fetch a single artifact for a specific run.

Args: run_id: Run identifier. key: Exact key to match (if present in logged artifacts). key_contains: Optional substring to match when the exact key is unknown. direction: "output" (default) or "input".

load_run_output(run_id, key, **kwargs)

Load a specific output artifact from a run by key.

Parameters:

Name Type Description Default
run_id str

Run identifier.

required
key str

Output artifact key to load.

required
**kwargs Any

Forwarded to Tracker.load(...).

{}

Returns:

Type Description
Any

Loaded artifact data.

find_matching_run(config_hash, input_hash, git_hash)

Find a previously completed run that matches the identity hashes.

Parameters:

Name Type Description Default
config_hash str

Hash of the canonicalized config for the run.

required
input_hash str

Hash of the run inputs.

required
git_hash str

Git commit hash captured with the run.

required

Returns:

Type Description
Optional[Run]

The matching run, or None if not found or if no database is configured.

on_run_start(callback)

Register a callback to be invoked when a run starts.

The callback receives the Run object after it has been initialized but before any user code executes. This is useful for external integrations like OpenLineage event emission, logging, or notifications.

Parameters:

Name Type Description Default
callback Callable[[Run], None]

A function that takes a Run object as its only argument.

required

Returns:

Type Description
Callable[[Run], None]

The same callback, allowing use as a decorator.

Example
@tracker.on_run_start
def log_start(run):
    print(f"Starting run: {run.id}")

# Or without decorator:
tracker.on_run_start(my_callback_function)

on_run_complete(callback)

Register a callback to be invoked when a run completes successfully.

Parameters:

Name Type Description Default
callback Callable[[Run, List[Artifact]], None]

Called with the completed Run and its output artifacts.

required

Returns:

Type Description
Callable[[Run, List[Artifact]], None]

The same callback, allowing use as a decorator.

on_run_failed(callback)

Register a callback to be invoked when a run fails.

Parameters:

Name Type Description Default
callback Callable[[Run, Exception], None]

Called with the failed Run and the raised exception.

required

Returns:

Type Description
Callable[[Run, Exception], None]

The same callback, allowing use as a decorator.