Skip to content

API Helpers

consist.api provides top-level helper functions that forward to the active tracker. Use these when you want concise scripts and notebooks without passing a tracker object to every call.

When to use helpers vs class methods

Prefer When
consist.* helpers Notebook/script workflows where use_tracker(...) is already set
Tracker.* methods Library/application code where explicit object wiring is preferred

High-traffic helpers

  • Run execution: consist.run, consist.scenario, consist.trace, consist.start_run
  • Input wiring: consist.ref, consist.refs
  • Context and output paths: consist.use_tracker, consist.output_dir, consist.output_path
  • Artifact logging/loading: consist.log_artifact, consist.log_dataframe, consist.load, consist.load_df
  • Querying: consist.find_run, consist.find_runs, consist.find_latest_run, consist.run_query, consist.run_set, consist.get_run_result, consist.config_run_query, consist.config_run_rows

consist.find_runs(...), consist.find_run(...), and consist.find_latest_run(...) forward standard run filters, including stage= and phase= for workflow-level queries.

Minimal runnable helper workflow

from pathlib import Path
import consist
from consist import Tracker

tracker = Tracker(run_dir="./runs", db_path="./provenance.duckdb")

def step() -> Path:
    out = consist.output_path("step", ext="txt")
    out.write_text("done\n")
    return out

with consist.use_tracker(tracker):
    result = consist.run(fn=step, outputs=["step"])
    latest = consist.find_run(run_id=result.run.id)

print(result.outputs["step"].path)
print(latest.id if latest else None)

For class-level equivalents, see Tracker and Workflow Contexts.

RelationConnectionLeakWarning

Bases: RuntimeWarning

Warning emitted when relation connections appear to accumulate.

view(model, name=None)

Create a SQLModel class backed by a Consist hybrid view.

This is a convenience wrapper around the view factory that lets you define a SQLModel schema for a concept (e.g., a canonicalized config table, an ingested dataset, or a computed artifact view) and then query it as a normal SQLModel table. The returned class is a dynamic subclass with table=True that points at a database view, so you can use it in sqlmodel.Session queries without creating a physical table.

If you need explicit control over view naming or want to create multiple named views for the same concept, use Tracker.create_view(...) or Tracker.view(...) directly.

Parameters:

Name Type Description Default
model Type[T]

Base SQLModel describing the schema (columns and types).

required
name Optional[str]

Optional override for the generated view name (defaults to the model's table name).

None

Returns:

Type Description
Type[T]

SQLModel subclass with table=True pointing at the hybrid view.

Examples:

from sqlmodel import Session, select
from consist import view
from consist.models.activitysim import ActivitySimConstantsCache

# Create a dynamic view class for querying constants
ConstantsView = view(ActivitySimConstantsCache)

with Session(tracker.engine) as session:
    rows = session.exec(
        select(ConstantsView)
        .where(ConstantsView.key == "sample_rate")
    ).all()

use_tracker(tracker)

Set a fallback (default) tracker for Consist API entrypoints.

This configures which tracker is used by consist.run(), consist.start_run(), etc. when called outside an active run context (i.e., when the tracker stack is empty). Once inside a run, the tracker becomes "active" via push_tracker() and is accessed by logging functions like consist.log_artifact().

run(fn=None, name=None, *, tracker=None, adapter=None, identity_inputs=None, cache_options=None, output_policy=None, execution_options=None, **kwargs)

Execute a computational task as a tracked and cached Consist run.

This high-level entrypoint encapsulates the execution of a callable within a managed provenance context. It automates the lifecycle of the run, including signature computation for cache identity, artifact capture, and result persistence.

Parameters:

Name Type Description Default
fn Optional[Callable]

The computational function to execute.

None
name Optional[str]

A semantic identifier for the run. If omitted, the function's __name__ is utilized as the default.

None
tracker Optional[Tracker]

The Tracker instance responsible for provenance and caching. If None, the active global tracker is resolved.

None
adapter Optional[ConfigAdapter]

Config adapter used to resolve adapter-driven identity for the run.

None
identity_inputs IdentityInputs

Additional hash-only identity inputs that contribute to cache keys.

None
cache_options Optional[CacheOptions]

Grouped cache controls for run execution.

None
output_policy Optional[OutputPolicyOptions]

Grouped output mismatch/missing policy controls.

None
execution_options Optional[ExecutionOptions]

Grouped runtime execution controls.

None
**kwargs Any

Arguments forwarded to Tracker.run, including inputs, config, tags, and other core run metadata.

Legacy direct policy kwargs are rejected (for example cache_mode, output_missing, inject_context). Use options objects instead: cache_options=CacheOptions(...), output_policy=OutputPolicyOptions(...), and execution_options=ExecutionOptions(...).

{}

Returns:

Type Description
RunResult

A container holding the function's return value and the immutable Run record.

Raises:

Type Description
TypeError

Raised when legacy run-policy kwargs are provided directly instead of via options objects.

ref(run_result, key=None)

Select a specific output artifact from a RunResult.

Parameters:

Name Type Description Default
run_result RunResult

Result object returned by consist.run/tracker.run.

required
key Optional[str]

Output key to select. If omitted, run_result must have exactly one output.

None

Returns:

Type Description
Artifact

Selected artifact reference, suitable for inputs=....

refs(*selectors, **aliases)

refs(run_result: RunResult) -> Dict[str, Artifact]
refs(
    run_result: RunResult, *keys: str
) -> Dict[str, Artifact]
refs(
    run_result: RunResult, alias_map: Mapping[str, str]
) -> Dict[str, Artifact]
refs(
    *selectors: tuple[Any, ...],
    **aliases: Union[RunResult, tuple[RunResult, str]],
) -> Dict[str, Artifact]

Build an inputs={...} mapping from one or more RunResult objects.

Supported forms
  • Single result shorthand: consist.refs(prep, "households", "persons")
  • Single result alias mapping: consist.refs(prep, {"hh": "households", "pp": "persons"})
  • Multi-result positional groups: consist.refs((prep, "households"), (skim, "skims"))
  • Keyword aliases: consist.refs(hh=(prep, "households"), skims=(skim, "skims"))
Notes

Bare string selectors (for example consist.refs("households")) are not supported. Provide a RunResult context via positional or keyword forms.

Returns:

Type Description
Dict[str, Artifact]

Mapping suitable for inputs=....

trace(name, *, tracker=None, adapter=None, identity_inputs=None, **kwargs)

Create a nested tracing context within an active run.

Use trace to break down a large run into smaller, logical steps. Each traced step is recorded as a sub-run with its own inputs and outputs.

Parameters:

Name Type Description Default
name str

Semantic name for this step/trace.

required
tracker Optional[Tracker]

The tracker instance to use.

None
adapter Optional[ConfigAdapter]

Config adapter used to resolve adapter-driven identity for the trace.

None
identity_inputs IdentityInputs

Additional hash-only identity inputs that contribute to cache keys.

None
**kwargs Any

Additional metadata or parameters for this trace.

{}

Yields:

Type Description
Tracker

The active tracker instance.

start_run(run_id, model, tracker=None, **kwargs)

Initiate and manage a Consist run.

This context manager marks the beginning of a discrete unit of work (a "run"). All artifacts logged within this context will be associated with this run.

Parameters:

Name Type Description Default
run_id str

A unique identifier for this run.

required
model str

The name of the model or system being run.

required
tracker Optional[Tracker]

The tracker instance to use. Defaults to the active global tracker.

None
**kwargs Any

Additional parameters for the run, such as tags, config, or inputs.

{}

Yields:

Type Description
Tracker

The active tracker instance.

Example
with consist.start_run("run_123", "my_model"):
    consist.log_artifact("data.csv", "input_data")

define_step(*, model=None, name_template=None, outputs=None, schema_outputs=None, output_paths=None, inputs=None, input_keys=None, optional_input_keys=None, config=None, adapter=None, identity_inputs=None, facet=None, facet_index=None, cache_mode=None, cache_hydration=None, cache_version=None, validate_cached_outputs=None, input_binding=None, load_inputs=None, tags=None, facet_from=None, facet_schema_version=None, description=None, **extra)

Attach metadata to a function without changing execution behavior.

This decorator lets you attach defaults such as outputs or tags to a function. Tracker.run and ScenarioContext.run read this metadata. Callable values are resolved at runtime with a StepContext.

require_runtime_kwargs(*names)

Enforce the presence of required runtime_kwargs when a step is executed.

Use this decorator to catch missing runtime-only inputs early, before executing a run. It validates that each required name is present in the runtime_kwargs passed to Tracker.run or ScenarioContext.run.

scenario(name, tracker=None, *, enabled=True, **kwargs)

Context manager for grouping multiple execution steps into a semantic scenario.

A scenario creates a parent "header" run that aggregates metadata, lineage, and artifacts from all nested steps. This enables multi-stage simulations to be tracked as a single unit while maintaining fine-grained provenance for each internal component.

Parameters:

Name Type Description Default
name str

Unique identifier or name for the scenario header.

required
tracker Optional[Tracker]

The Tracker instance to use for persistence. Defaults to the active global tracker.

None
enabled bool

If False, returns a no-op context that executes functions without recording provenance, preserving the Scenario API ergonomics for debugging.

True
**kwargs Any

Additional metadata (e.g., tags, config) forwarded to the header run.

{}

Yields:

Type Description
ScenarioContext

An active scenario context with an internal Coupler for artifact chaining.

single_step_scenario(name, step_name=None, tracker=None, **kwargs)

Convenience wrapper that exposes a single step scenario.

Parameters:

Name Type Description Default
name str

Name of the scenario header.

required
step_name Optional[str]

Name for the single step; defaults to name.

None
tracker Optional[Tracker]

Tracker to execute the scenario; defaults to the active tracker.

None
**kwargs Any

Arguments forwarded to Tracker.scenario.

{}

Yields:

Type Description
ScenarioContext

Scenario context manager for the single step.

current_tracker()

Retrieves the active Tracker instance from the global context.

If no run is active, this function falls back to the default tracker (if set via consist.use_tracker or consist.set_current_tracker).

Returns:

Type Description
Tracker

The Tracker instance currently active in the execution context.

Raises:

Type Description
RuntimeError

If no Tracker is active in the current context and no default tracker has been configured.

current_run()

Return the active Run record if one is in progress, otherwise None.

A Run record contains metadata about the current execution, such as its unique ID, model name, and start time.

current_consist()

Return the active ConsistRecord if one is in progress, otherwise None.

The ConsistRecord is the internal state object that tracks the active run's inputs, outputs, and metadata during execution.

output_dir(namespace=None)

Resolve the managed output directory for the active run context.

Parameters:

Name Type Description Default
namespace Optional[str]

Optional relative subdirectory under the active run's managed output directory.

None

Returns:

Type Description
Path

Absolute path to the managed output directory.

Raises:

Type Description
RuntimeError

If called outside an active run context.

output_path(key, ext='parquet')

Resolve a deterministic managed output path for the active run context.

Parameters:

Name Type Description Default
key str

Artifact key used as the output filename stem.

required
ext str

File extension to append to the output filename.

"parquet"

Returns:

Type Description
Path

Absolute managed output path for the current run.

Raises:

Type Description
RuntimeError

If called outside an active run context.

cached_artifacts(direction='output')

Returns hydrated cached artifacts for the active run, if any.

Parameters:

Name Type Description Default
direction str

"output" or "input".

"output"

Returns:

Type Description
Dict[str, Artifact]

Mapping from artifact key to Artifact, or empty dict if no cache hit.

cached_output(key=None)

Fetch a hydrated cached output artifact for the active run.

Parameters:

Name Type Description Default
key Optional[str]

Specific artifact key to look up; defaults to the first available artifact.

None

Returns:

Type Description
Optional[Artifact]

Cached artifact instance or None if no cache hit exists.

get_run_result(run_id, *, keys=None, validate='lazy', tracker=None)

Retrieve a historical run as a RunResult with selected outputs.

Parameters:

Name Type Description Default
run_id str

Identifier of the historical run.

required
keys Optional[Iterable[str]]

Optional output keys to include. If omitted, includes all outputs.

None
validate (lazy, strict, none)

Output validation policy forwarded to Tracker.get_run_result.

"lazy"
tracker Optional[Tracker]

Tracker instance to query. If omitted, resolves the active/default tracker.

None

Returns:

Type Description
RunResult

Historical run metadata plus selected outputs.

get_artifact(run_id, key=None, key_contains=None, direction='output')

Retrieve a single artifact from a historical run.

Parameters:

Name Type Description Default
run_id str

Identifier of the run that produced the artifact.

required
key Optional[str]

Exact artifact key to match.

None
key_contains Optional[str]

Substring filter for artifact keys.

None
direction str

Either "input" or "output".

"output"

Returns:

Type Description
Optional[Artifact]

Matching artifact or None if not found.

register_artifact_facet_parser(prefix, parser_fn, *, tracker=None)

Register a key-prefix parser for deriving artifact facets at log time.

The parser is invoked when logging an artifact without an explicit facet=.

log_artifact(path, key=None, direction='output', schema=None, driver=None, content_hash=None, force_hash_override=False, validate_content_hash=False, reuse_if_unchanged=False, reuse_scope='same_uri', facet=None, facet_schema_version=None, facet_index=False, *, enabled=True, **meta)

Logs an artifact (file or data reference) to the currently active run.

This function is a convenient proxy to consist.core.tracker.Tracker.log_artifact. It automatically links the artifact to the current run context, handles path virtualization, and performs lineage discovery.

Parameters:

Name Type Description Default
path ArtifactRef

A file path (str/Path) or an existing Artifact reference to be logged.

required
key Optional[str]

A semantic, human-readable name for the artifact (e.g., "households"). Required if path is a path-like (str/Path).

None
direction str

Specifies whether the artifact is an "input" or "output" for the current run. Defaults to "output".

"output"
schema Optional[Type[SQLModel]]

An optional SQLModel class that defines the expected schema for the artifact's data. Its name will be stored in artifact metadata.

None
driver Optional[str]

Explicitly specify the driver (e.g., 'h5_table'). If None, the driver is inferred from the file extension.

None
content_hash Optional[str]

Precomputed content hash to use for the artifact instead of hashing the path on disk.

None
force_hash_override bool

If True, overwrite an existing artifact hash when it differs from content_hash. By default, mismatched overrides are ignored with a warning.

False
validate_content_hash bool

If True, verify content_hash against the on-disk data and raise on mismatch.

False
reuse_if_unchanged bool

Deprecated for outputs. Consist always creates a fresh output artifact row; identical bytes are deduplicated via artifact.content_id. Setting this on outputs emits a warning and does not reuse prior rows. Input-side behavior is unaffected.

False
reuse_scope (same_uri, any_uri)

Deprecated for outputs. any_uri is ignored for outputs; content identity (content_id) governs deduplication. Input-side behavior is unaffected.

"same_uri"
facet Optional[Any]

Optional artifact-level facet payload (dict or Pydantic model).

None
facet_schema_version Optional[Union[str, int]]

Optional schema version for artifact facet compatibility.

None
facet_index bool

If True, flatten scalar facet values for fast artifact querying.

False
enabled bool

If False, returns a noop artifact object without requiring an active run.

True
**meta Any

Additional key-value pairs to store in the artifact's flexible meta field.

{}

Returns:

Type Description
Artifact

The created or updated Artifact object.

Raises:

Type Description
RuntimeError

If called outside an active run context.

ValueError

If key is not provided when path is a path-like (str/Path).

log_artifacts(outputs, *, direction='output', driver=None, metadata_by_key=None, facets_by_key=None, facet_schema_versions_by_key=None, facet_index=False, reuse_if_unchanged=False, reuse_scope='same_uri', enabled=True, **shared_meta)

Log multiple artifacts in a single call for efficiency.

This function is a convenient proxy to consist.core.tracker.Tracker.log_artifacts. It logs a batch of related artifacts with optional per-key metadata customization.

Parameters:

Name Type Description Default
outputs Mapping[str, ArtifactRef]

Mapping of key -> path/Artifact to log. Keys must be strings and explicitly chosen by the caller (not inferred from filenames).

required
direction str

"input" or "output" for the current run context.

"output"
driver Optional[str]

Explicitly specify driver for all artifacts. If None, inferred from file extension.

None
metadata_by_key Optional[Mapping[str, Dict[str, Any]]]

Per-key metadata overrides applied on top of shared metadata.

None
facets_by_key Optional[Mapping[str, Any]]

Per-key artifact facet payloads.

None
facet_schema_versions_by_key Optional[Mapping[str, Union[str, int]]]

Optional per-key schema versions for artifact facets.

None
facet_index bool

Whether to index scalar artifact facet fields in artifact_kv.

False
reuse_if_unchanged bool

Deprecated for outputs. Batch output logging still creates a fresh artifact row per call; identical bytes are deduplicated via artifact.content_id. Setting this on outputs emits a warning and does not reuse prior rows. Input-side behavior is unaffected.

False
reuse_scope (same_uri, any_uri)

Deprecated for outputs. any_uri is ignored for outputs; deduplication is governed by content_id instead. Input-side behavior is unaffected.

"same_uri"
enabled bool

If False, returns noop artifact objects without requiring an active run.

True
**shared_meta Any

Metadata key-value pairs applied to ALL logged artifacts.

{}

Returns:

Type Description
Mapping[str, ArtifactLike]

Mapping of key -> logged Artifact-like objects.

Raises:

Type Description
RuntimeError

If called outside an active run context.

ValueError

If metadata_by_key contains keys not in outputs, or if any value is None.

TypeError

If mapping keys are not strings.

Examples:

Log multiple outputs with shared metadata:

artifacts = consist.log_artifacts(
    {
        "persons": "results/persons.parquet",
        "households": "results/households.parquet",
        "jobs": "results/jobs.parquet"
    },
    year=2030,
    scenario="base"
)
# All three artifacts get year=2030 and scenario="base"

Mix shared and per-key metadata:

artifacts = consist.log_artifacts(
    {
        "persons": "output/persons.parquet",
        "households": "output/households.parquet"
    },
    metadata_by_key={
        "households": {"role": "primary_unit", "weight": 1.0}
    },
    dataset_version="v2",
    simulation_id="run_001"
)
# persons gets: dataset_version="v2", simulation_id="run_001"
# households gets: dataset_version="v2", simulation_id="run_001",
#                  role="primary_unit", weight=1.0

log_input(path, key=None, *, schema=None, driver=None, content_hash=None, force_hash_override=False, validate_content_hash=False, facet=None, facet_schema_version=None, facet_index=False, enabled=True, **meta)

Log an input artifact to the active run.

An input artifact represents a data source that the current run reads from. Logging it creates a lineage link, allowing Consist to track which versions of data produced which results.

Parameters:

Name Type Description Default
path ArtifactRef

File path (str/Path) or an existing Artifact object.

required
key Optional[str]

Semantic name for the artifact (e.g. "raw_households"). Required if path is a file path.

None
schema Optional[Type[SQLModel]]

Optional SQLModel class defining the data structure.

None
driver Optional[str]

Explicit format driver (e.g. "parquet"). Inferred from extension if None.

None
content_hash Optional[str]

Precomputed hash to avoid re-hashing large files.

None
force_hash_override bool

Overwrite existing hash in the database if different from content_hash.

False
validate_content_hash bool

Re-hash the file on disk to ensure it matches the provided content_hash.

False
facet Optional[Any]

Optional artifact-level facet payload.

None
facet_schema_version Optional[Union[str, int]]

Optional facet schema version.

None
facet_index bool

Whether to index scalar facet fields for querying.

False
enabled bool

If False, returns a dummy artifact object for disconnected execution.

True
**meta Any

Additional metadata fields to store with the artifact.

{}

Returns:

Type Description
ArtifactLike

The logged artifact reference.

log_output(path, key=None, *, schema=None, driver=None, content_hash=None, force_hash_override=False, validate_content_hash=False, reuse_if_unchanged=False, reuse_scope='same_uri', facet=None, facet_schema_version=None, facet_index=False, enabled=True, **meta)

Log an output artifact produced by the current run.

Parameters:

Name Type Description Default
path ArtifactRef

File path (str/Path) or an existing Artifact object.

required
key Optional[str]

Semantic name for the artifact (e.g. "processed_results"). Required if path is a file path.

None
schema Optional[Type[SQLModel]]

Optional SQLModel class defining the data structure.

None
driver Optional[str]

Explicit format driver (e.g. "parquet"). Inferred from extension if None.

None
content_hash Optional[str]

Precomputed hash to avoid re-hashing large files.

None
force_hash_override bool

Overwrite existing hash in the database if different from content_hash.

False
validate_content_hash bool

Re-hash the file on disk to ensure it matches the provided content_hash.

False
reuse_if_unchanged bool

Deprecated for outputs. A fresh output artifact row is always created; identical bytes share content_id. Setting this emits a warning and does not reuse prior rows.

False
reuse_scope (same_uri, any_uri)

Deprecated for outputs. any_uri is ignored; deduplication is by content_id.

"same_uri"
facet Optional[Any]

Optional artifact-level facet payload.

None
facet_schema_version Optional[Union[str, int]]

Optional facet schema version.

None
facet_index bool

Whether to index scalar facet fields for querying.

False
enabled bool

If False, returns a dummy artifact object.

True
**meta Any

Additional metadata fields to store.

{}

Returns:

Type Description
ArtifactLike

The logged artifact reference.

log_dataframe(df, key, schema=None, direction='output', tracker=None, path=None, driver=None, meta=None, **to_file_kwargs)

Serialize a DataFrame, log it as an artifact, and trigger optional ingestion.

Parameters:

Name Type Description Default
df DataFrame

Data to persist.

required
key str

Logical artifact key.

required
schema Optional[Type[SQLModel]]

Schema used for ingestion, if provided.

None
direction str

Artifact direction relative to the run.

"output"
tracker Optional[Tracker]

Tracker instance to use; defaults to the active tracker. If None and no active run context exists, raises RuntimeError.

None
path Optional[Union[str, Path]]

Output path; defaults to <run_dir>/outputs/<run_subdir>/<key>.<driver> for the active run as determined by the tracker's run subdir configuration.

None
driver Optional[str]

File format driver (e.g., "parquet" or "csv").

None
meta Optional[Dict[str, Any]]

Additional metadata for the artifact.

None
**to_file_kwargs Any

Keyword arguments forwarded to pd.DataFrame.to_parquet or to_csv.

{}

Returns:

Type Description
Artifact

The artifact logged for the written dataset.

Raises:

Type Description
ValueError

If the requested driver is unsupported.

log_meta(**kwargs)

Updates the active run's metadata with the provided key-value pairs.

This function is a convenient proxy to consist.core.tracker.Tracker.log_meta. It allows users to log additional information about the current run, such as performance metrics, experimental parameters, or tags, directly to the run's metadata. This information is then persisted to both the JSON log and the DuckDB database.

Parameters:

Name Type Description Default
**kwargs Any

Arbitrary key-value pairs to merge into the meta dictionary of the current run. Existing keys will be updated, and new keys will be added.

{}

Raises:

Type Description
RuntimeError

If called when no Tracker is active in the current context.

ingest(artifact, data=None, schema=None, run=None)

Ingests data associated with an Artifact into the active run's database.

This function is a convenient proxy to consist.core.tracker.Tracker.ingest. It materializes data into the DuckDB, leveraging dlt for efficient loading and injecting provenance information. This is part of the "Hot Data Strategy".

Parameters:

Name Type Description Default
artifact Artifact

The artifact object representing the data being ingested.

required
data Optional[Union[Iterable[Dict[str, Any]], Any]]

The data to ingest. Can be an iterable of dictionaries (rows), a file path (str or Path), a Pandas DataFrame, or any other data type that dlt can handle. If None, Consist attempts to read the data directly from the artifact's resolved file path.

None
schema Optional[Type[SQLModel]]

An optional SQLModel class that defines the expected schema for the ingested data. If provided, dlt will use this for strict validation and schema inference.

None
run Optional[Run]

The Run context for ingestion. If provided, the data will be tagged with this run's ID (Offline Mode). If None, it defaults to the currently active run (Online Mode).

None

Returns:

Type Description
Any

The result information from the underlying dlt ingestion process.

Raises:

Type Description
RuntimeError

If no database is configured for the active Tracker or if ingest is called outside of an active run context.

Exception

Any exception raised by the underlying dlt ingestion process.

register_views(*models)

Register hybrid view models for the active tracker.

Parameters:

Name Type Description Default
*models Type[SQLModel]

SQLModel classes describing the schema of hybrid views.

()

Returns:

Type Description
Dict[str, Type[SQLModel]]

Mapping from model class name to the generated view model.

find_run(tracker=None, **filters)

Convenience proxy for Tracker.find_run.

Parameters:

Name Type Description Default
tracker Optional[Tracker]

Tracker instance to query; defaults to the active tracker.

None
**filters Any

Filter values forwarded to Tracker.find_run.

{}

Returns:

Type Description
Optional[Run]

Matching run or None when no match exists.

find_runs(tracker=None, **filters)

Convenience proxy for Tracker.find_runs.

Parameters:

Name Type Description Default
tracker Optional[Tracker]

Tracker instance to query; defaults to the active tracker.

None
**filters Any

Filter values forwarded to Tracker.find_runs. This includes workflow filters such as stage and phase.

{}

Returns:

Type Description
Union[list[Run], Dict[Hashable, Run]]

Results returned by Tracker.find_runs.

find_latest_run(tracker=None, **filters)

Convenience proxy for Tracker.find_latest_run.

Parameters:

Name Type Description Default
tracker Optional[Tracker]

Tracker instance to query; defaults to the active tracker.

None
**filters Any

Filter values forwarded to Tracker.find_latest_run. This includes workflow filters such as stage and phase.

{}

Returns:

Type Description
Run

The latest matching run.

run_set(tracker=None, label=None, **filters)

Convenience proxy for Tracker.run_set.

Parameters:

Name Type Description Default
tracker Optional[Tracker]

Tracker instance to query; defaults to the active tracker.

None
label Optional[str]

Optional label attached to the returned RunSet.

None
**filters Any

Filter values forwarded to Tracker.run_set.

{}

Returns:

Type Description
RunSet

A tracker-backed RunSet for grouping/alignment workflows.

Notes

This convenience helper resolves a tracker from context and forwards to Tracker.run_set(...).

db_session(tracker=None)

Provide a SQLModel Session connected to the tracker's database.

Parameters:

Name Type Description Default
tracker Optional[Tracker]

Tracker instance supplying the engine; defaults to active tracker.

None

Yields:

Type Description
Session

SQLModel session bound to the tracker engine.

run_query(query, tracker=None)

Execute a SQLModel/SQLAlchemy query via the tracker engine.

Parameters:

Name Type Description Default
query Executable

Query object (select, etc.).

required
tracker Optional[Tracker]

Tracker instance supplying the engine; defaults to the active tracker.

None

Returns:

Type Description
list

Results of the executed query.

config_run_query(table, *, link_table, table_name=None, columns=None, where=None, join_on=None)

Build a query joining a config cache table to its run-link table.

This is a convenience helper for config adapter tables that are deduplicated by content hash and linked to runs via a separate link table.

Parameters:

Name Type Description Default
table Type[SQLModel]

Config cache table to query (e.g., ActivitySimCoefficientsCache).

required
link_table Type[SQLModel]

Link table storing run_id/content_hash (e.g., ActivitySimConfigIngestRunLink).

required
table_name Optional[str]

Optional filter for link_table.table_name. Defaults to table.tablename if link_table has a table_name column.

None
columns Optional[Iterable[Any]]

Columns to select. Defaults to (link_table.run_id, table).

None
where Optional[Union[Iterable[Any], Any]]

Optional filter clauses to apply to the query.

None
join_on Optional[Iterable[str]]

Column names to join on. Defaults to content_hash and file_name when available.

None

Returns:

Type Description
Executable

SQL query joining the config table to runs.

config_run_rows(table, *, link_table, table_name=None, columns=None, where=None, join_on=None, tracker=None)

Execute a config-to-run join query and return the results as a list of rows.

This is a high-level wrapper around config_run_query and run_query. It is frequently used by integration adapters (like ActivitySim or BEAM) to retrieve configuration parameters that were active during a specific run.

Parameters:

Name Type Description Default
table Type[SQLModel]

The configuration cache table to query.

required
link_table Type[SQLModel]

The table that links configuration entries to specific run IDs.

required
table_name Optional[str]

Optional filter for the link table's table_name column.

None
columns Optional[Iterable]

Specific columns to select. Defaults to the run ID and the config record.

None
where Optional[Clause]

Additional SQLAlchemy filter clauses.

None
join_on Optional[Iterable[str]]

Column names to join on (e.g. ["content_hash"]).

None
tracker Optional[Tracker]

The tracker to use for database access.

None

Returns:

Type Description
list

The results of the query.

pivot_facets(*, namespace, keys, value_column='value_num', value_columns=None, label_prefix='', label_map=None, run_id_label='run_id', table=RunConfigKV)

Build a pivoted facet subquery keyed by run_id.

This is a convenience helper for turning flattened config facets (run_config_kv) into a wide table suitable for joins.

Parameters:

Name Type Description Default
namespace Optional[str]

Facet namespace to filter by (typically the model name). If None, the namespace filter is skipped.

required
keys Iterable[str]

Facet keys to pivot into columns.

required
value_column str

Default value column to read from. Must be one of: value_num, value_str, value_bool, value_json.

"value_num"
value_columns Optional[Mapping[str, str]]

Optional per-key override of value_column.

None
label_prefix str

Optional prefix for pivoted column labels.

""
label_map Optional[Mapping[str, str]]

Optional per-key label overrides.

None
run_id_label str

Label to use for the run id column in the returned subquery.

"run_id"
table Type[RunConfigKV]

Table/model providing the facet KV rows.

RunConfigKV

Returns:

Type Description
Any

A SQLAlchemy subquery with columns: run_id and one column per key.

capture_outputs(directory, pattern='*', recursive=False)

Context manager to automatically capture and log new or modified files in a directory within the current active run context.

This function is a convenient proxy to consist.core.tracker.Tracker.capture_outputs. It watches a specified directory for any file changes (creations or modifications) that occur within its with block. These changes are then automatically logged as output artifacts of the current Consist run.

Parameters:

Name Type Description Default
directory Union[str, Path]

The path to the directory to monitor for new or modified files.

required
pattern str

A glob pattern (e.g., ".csv", "data_.parquet") to filter which files are captured within the specified directory. Defaults to all files.

"*"
recursive bool

If True, the capture will recursively scan subdirectories within directory for changes.

False

Yields:

Type Description
OutputCapture

An OutputCapture object containing a list of Artifact objects that were captured and logged after the with block finishes.

Raises:

Type Description
RuntimeError

If capture_outputs is used outside of an active start_run context.

load(artifact, tracker=None, *, db_fallback='inputs-only', **kwargs)

load(
    artifact: ZarrArtifact,
    tracker: Optional[Tracker] = None,
    *,
    db_fallback: str = "inputs-only",
    **kwargs: Any,
) -> XarrayDatasetType
load(
    artifact: NetCdfArtifact,
    tracker: Optional[Tracker] = None,
    *,
    db_fallback: str = "inputs-only",
    **kwargs: Any,
) -> XarrayDatasetType
load(
    artifact: OpenMatrixArtifact,
    tracker: Optional[Tracker] = None,
    *,
    db_fallback: str = "inputs-only",
    **kwargs: Any,
) -> XarrayDatasetType
load(
    artifact: SpatialArtifact,
    tracker: Optional[Tracker] = None,
    *,
    db_fallback: str = "inputs-only",
    **kwargs: Any,
) -> GeoDataFrameType
load(
    artifact: HdfStoreArtifact,
    tracker: Optional[Tracker] = None,
    *,
    db_fallback: str = "inputs-only",
    **kwargs: Any,
) -> pd.HDFStore
load(
    artifact: DataFrameArtifact,
    tracker: Optional[Tracker] = None,
    *,
    db_fallback: str = "inputs-only",
    **kwargs: Any,
) -> duckdb.DuckDBPyRelation
load(
    artifact: JsonArtifact,
    tracker: Optional[Tracker] = None,
    *,
    db_fallback: str = "inputs-only",
    **kwargs: Any,
) -> duckdb.DuckDBPyRelation
load(
    artifact: TabularArtifact,
    tracker: Optional[Tracker] = None,
    *,
    db_fallback: str = "inputs-only",
    **kwargs: Any,
) -> duckdb.DuckDBPyRelation
load(
    artifact: Artifact,
    tracker: Optional[Tracker] = None,
    *,
    db_fallback: str = "inputs-only",
    **kwargs: Any,
) -> LoadResult
load(
    artifact: ArtifactLike,
    tracker: Optional[Tracker] = None,
    *,
    db_fallback: str = "inputs-only",
    **kwargs: Any,
) -> LoadResult

Smart loader that retrieves data for an artifact from the best available source.

This function attempts to load the data associated with an Artifact object. It prioritizes loading from disk (raw format) if the file exists. If the file is missing but the artifact is marked as ingested, it can optionally recover the data from the Consist DuckDB database ("Ghost Mode"). By default, DB recovery is only allowed when the artifact is an input to an active, non-cached run.

Parameters:

Name Type Description Default
artifact Artifact

The Consist Artifact object whose data is to be loaded.

required
tracker Optional[Tracker]

The Tracker instance to use for path resolution and database access. If None, the function attempts to use the active global tracker context. Explicitly passing a tracker is recommended for clarity or when no global context is available.

None
db_fallback str

Controls when the loader is allowed to fall back to DuckDB ("Ghost Mode") when the file is missing but the artifact is marked as ingested.

  • "inputs-only": allow DB fallback only if the artifact is declared as an input to the current active run AND the current run is not a cache hit.
  • "always": allow DB fallback whenever artifact.meta["is_ingested"] is true and a tracker with a DB connection is available.
  • "never": disable DB fallback entirely.
"inputs-only"
**kwargs Any

Additional keyword arguments to pass to the underlying data loader function (e.g., pd.read_parquet, pd.read_csv, xr.open_zarr, pd.read_sql).

{}

Returns:

Type Description
LoadResult

The loaded data, typically a DuckDB Relation for tabular data, an xarray Dataset for array formats, or another data object depending on the artifact's driver and the data format.

Raises:

Type Description
RuntimeError

If no Tracker instance can be resolved (neither provided nor active in context) and the artifact's absolute path is not directly resolvable. Also if the artifact is marked as ingested but no tracker with a DB connection is available.

FileNotFoundError

If the artifact's data cannot be found on disk or recovered from the database.

load_df(artifact, tracker=None, *, db_fallback='inputs-only', close=True, **kwargs)

Load a tabular artifact and return a pandas DataFrame.

Parameters:

Name Type Description Default
artifact ArtifactLike

Artifact to load.

required
tracker Optional[Tracker]

Tracker to use for resolving paths or DB fallback.

None
db_fallback str

Controls when DB recovery is allowed for ingested artifacts.

"inputs-only"
close bool

Whether to close the underlying DuckDB connection after materialization when the load returns a Relation.

True
**kwargs Any

Additional loader options.

{}

load_relation(artifact, tracker=None, *, db_fallback='inputs-only', **kwargs)

Context manager that yields a DuckDB Relation and ensures the underlying connection is closed on exit.

to_df(relation, *, close=True)

Convert a DuckDB Relation to a pandas DataFrame.

Parameters:

Name Type Description Default
relation DuckDBPyRelation

Relation to materialize into a DataFrame.

required
close bool

Whether to close the underlying DuckDB connection after materialization. Use close=False if you plan to continue using the relation.

True

active_relation_count()

Return the number of active DuckDB relations tracked by Consist.

set_current_tracker(tracker)

Set the default (fallback) tracker used by Consist entrypoints.

Entrypoints like consist.run(), consist.start_run(), and consist.scenario() use this tracker if they are called outside of an active run context and no explicit tracker= argument is provided.

Parameters:

Name Type Description Default
tracker Optional[Tracker]

The tracker instance to set as the default, or None to clear.

required

Returns:

Type Description
Optional[Tracker]

The previously configured default tracker, if any.

noop_scenario(name, **kwargs)

Creates a scenario context that executes without provenance tracking.

This is useful for debugging or running simulations where you want the ergonomics of the Consist scenario API (like the Coupler and RunResult) but do not want to record any metadata or artifacts to the database.

Parameters:

Name Type Description Default
name str

Name of the scenario (for display/logging purposes).

required
**kwargs Any

Additional arguments forwarded to the noop context.

{}

is_dataframe_artifact(artifact)

Type guard: narrow artifact to tabular types (parquet, csv, h5_table).

Use this to enable type-safe loading and IDE autocomplete:

if is_dataframe_artifact(artifact):
    rel = load(artifact)  # Type checker knows return is Relation
    df.head()  # IDE autocomplete works!

Parameters:

Name Type Description Default
artifact ArtifactLike

Artifact to check.

required

Returns:

Type Description
bool

True if artifact driver is parquet, csv, or h5_table.

is_tabular_artifact(artifact)

Type guard: narrow artifact to any tabular format (parquet, csv, h5_table, json).

Note: This is broader than is_dataframe_artifact(), so load() returns a Relation for tabular artifacts. Use load_df() for a pandas escape hatch.

Parameters:

Name Type Description Default
artifact ArtifactLike

Artifact to check.

required

Returns:

Type Description
bool

True if artifact driver produces tabular data.

is_json_artifact(artifact)

Type guard: narrow artifact to JSON format.

Parameters:

Name Type Description Default
artifact ArtifactLike

Artifact to check.

required

Returns:

Type Description
bool

True if artifact driver is json.

is_zarr_artifact(artifact)

Type guard: narrow artifact to Zarr format.

Use this when you know an artifact should be Zarr and want type-safe loading:

if is_zarr_artifact(artifact):
    ds = load(artifact)  # Type checker knows return is xarray.Dataset
    ds.dims  # IDE autocomplete works!

Parameters:

Name Type Description Default
artifact ArtifactLike

Artifact to check.

required

Returns:

Type Description
bool

True if artifact driver is zarr.

is_hdf_artifact(artifact)

Type guard: narrow artifact to HDF5 format (h5 or hdf5).

Parameters:

Name Type Description Default
artifact ArtifactLike

Artifact to check.

required

Returns:

Type Description
bool

True if artifact driver is h5 or hdf5.

is_spatial_artifact(artifact)

Type guard: narrow artifact to spatial formats (GeoDataFrame outputs).

Parameters:

Name Type Description Default
artifact ArtifactLike

Artifact to check.

required

Returns:

Type Description
bool

True if artifact driver is geojson, shapefile, or geopackage.