API Helpers¶
consist.api provides top-level helper functions that forward to the active
tracker. Use these when you want concise scripts and notebooks without passing
a tracker object to every call.
When to use helpers vs class methods¶
| Prefer | When |
|---|---|
consist.* helpers |
Notebook/script workflows where use_tracker(...) is already set |
Tracker.* methods |
Library/application code where explicit object wiring is preferred |
High-traffic helpers¶
- Run execution:
consist.run,consist.scenario,consist.trace,consist.start_run - Input wiring:
consist.ref,consist.refs - Context and output paths:
consist.use_tracker,consist.output_dir,consist.output_path - Artifact logging/loading:
consist.log_artifact,consist.log_dataframe,consist.load,consist.load_df - Querying:
consist.find_run,consist.find_runs,consist.find_latest_run,consist.run_query,consist.run_set,consist.get_run_result,consist.config_run_query,consist.config_run_rows
consist.find_runs(...), consist.find_run(...), and
consist.find_latest_run(...) forward standard run filters, including
stage= and phase= for workflow-level queries.
Minimal runnable helper workflow¶
from pathlib import Path
import consist
from consist import Tracker
tracker = Tracker(run_dir="./runs", db_path="./provenance.duckdb")
def step() -> Path:
out = consist.output_path("step", ext="txt")
out.write_text("done\n")
return out
with consist.use_tracker(tracker):
result = consist.run(fn=step, outputs=["step"])
latest = consist.find_run(run_id=result.run.id)
print(result.outputs["step"].path)
print(latest.id if latest else None)
For class-level equivalents, see Tracker and Workflow Contexts.
RelationConnectionLeakWarning
¶
Bases: RuntimeWarning
Warning emitted when relation connections appear to accumulate.
view(model, name=None)
¶
Create a SQLModel class backed by a Consist hybrid view.
This is a convenience wrapper around the view factory that lets you define
a SQLModel schema for a concept (e.g., a canonicalized config table, an
ingested dataset, or a computed artifact view) and then query it as a normal
SQLModel table. The returned class is a dynamic subclass with table=True
that points at a database view, so you can use it in sqlmodel.Session
queries without creating a physical table.
If you need explicit control over view naming or want to create multiple
named views for the same concept, use Tracker.create_view(...) or
Tracker.view(...) directly.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
model
|
Type[T]
|
Base SQLModel describing the schema (columns and types). |
required |
name
|
Optional[str]
|
Optional override for the generated view name (defaults to the model's table name). |
None
|
Returns:
| Type | Description |
|---|---|
Type[T]
|
SQLModel subclass with |
Examples:
from sqlmodel import Session, select
from consist import view
from consist.models.activitysim import ActivitySimConstantsCache
# Create a dynamic view class for querying constants
ConstantsView = view(ActivitySimConstantsCache)
with Session(tracker.engine) as session:
rows = session.exec(
select(ConstantsView)
.where(ConstantsView.key == "sample_rate")
).all()
use_tracker(tracker)
¶
Set a fallback (default) tracker for Consist API entrypoints.
This configures which tracker is used by consist.run(), consist.start_run(), etc. when called outside an active run context (i.e., when the tracker stack is empty). Once inside a run, the tracker becomes "active" via push_tracker() and is accessed by logging functions like consist.log_artifact().
run(fn=None, name=None, *, tracker=None, adapter=None, identity_inputs=None, cache_options=None, output_policy=None, execution_options=None, **kwargs)
¶
Execute a computational task as a tracked and cached Consist run.
This high-level entrypoint encapsulates the execution of a callable within a managed provenance context. It automates the lifecycle of the run, including signature computation for cache identity, artifact capture, and result persistence.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fn
|
Optional[Callable]
|
The computational function to execute. |
None
|
name
|
Optional[str]
|
A semantic identifier for the run. If omitted, the function's
|
None
|
tracker
|
Optional[Tracker]
|
The Tracker instance responsible for provenance and caching. If None, the active global tracker is resolved. |
None
|
adapter
|
Optional[ConfigAdapter]
|
Config adapter used to resolve adapter-driven identity for the run. |
None
|
identity_inputs
|
IdentityInputs
|
Additional hash-only identity inputs that contribute to cache keys. |
None
|
cache_options
|
Optional[CacheOptions]
|
Grouped cache controls for run execution. |
None
|
output_policy
|
Optional[OutputPolicyOptions]
|
Grouped output mismatch/missing policy controls. |
None
|
execution_options
|
Optional[ExecutionOptions]
|
Grouped runtime execution controls. |
None
|
**kwargs
|
Any
|
Arguments forwarded to Legacy direct policy kwargs are rejected (for example |
{}
|
Returns:
| Type | Description |
|---|---|
RunResult
|
A container holding the function's return value and the
immutable |
Raises:
| Type | Description |
|---|---|
TypeError
|
Raised when legacy run-policy kwargs are provided directly instead of via options objects. |
ref(run_result, key=None)
¶
Select a specific output artifact from a RunResult.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
run_result
|
RunResult
|
Result object returned by |
required |
key
|
Optional[str]
|
Output key to select. If omitted, |
None
|
Returns:
| Type | Description |
|---|---|
Artifact
|
Selected artifact reference, suitable for |
refs(*selectors, **aliases)
¶
Build an inputs={...} mapping from one or more RunResult objects.
Supported forms
- Single result shorthand:
consist.refs(prep, "households", "persons") - Single result alias mapping:
consist.refs(prep, {"hh": "households", "pp": "persons"}) - Multi-result positional groups:
consist.refs((prep, "households"), (skim, "skims")) - Keyword aliases:
consist.refs(hh=(prep, "households"), skims=(skim, "skims"))
Notes
Bare string selectors (for example consist.refs("households")) are not
supported. Provide a RunResult context via positional or keyword forms.
Returns:
| Type | Description |
|---|---|
Dict[str, Artifact]
|
Mapping suitable for |
trace(name, *, tracker=None, adapter=None, identity_inputs=None, **kwargs)
¶
Create a nested tracing context within an active run.
Use trace to break down a large run into smaller, logical steps. Each
traced step is recorded as a sub-run with its own inputs and outputs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Semantic name for this step/trace. |
required |
tracker
|
Optional[Tracker]
|
The tracker instance to use. |
None
|
adapter
|
Optional[ConfigAdapter]
|
Config adapter used to resolve adapter-driven identity for the trace. |
None
|
identity_inputs
|
IdentityInputs
|
Additional hash-only identity inputs that contribute to cache keys. |
None
|
**kwargs
|
Any
|
Additional metadata or parameters for this trace. |
{}
|
Yields:
| Type | Description |
|---|---|
Tracker
|
The active tracker instance. |
start_run(run_id, model, tracker=None, **kwargs)
¶
Initiate and manage a Consist run.
This context manager marks the beginning of a discrete unit of work (a "run"). All artifacts logged within this context will be associated with this run.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
run_id
|
str
|
A unique identifier for this run. |
required |
model
|
str
|
The name of the model or system being run. |
required |
tracker
|
Optional[Tracker]
|
The tracker instance to use. Defaults to the active global tracker. |
None
|
**kwargs
|
Any
|
Additional parameters for the run, such as |
{}
|
Yields:
| Type | Description |
|---|---|
Tracker
|
The active tracker instance. |
define_step(*, model=None, name_template=None, outputs=None, schema_outputs=None, output_paths=None, inputs=None, input_keys=None, optional_input_keys=None, config=None, adapter=None, identity_inputs=None, facet=None, facet_index=None, cache_mode=None, cache_hydration=None, cache_version=None, validate_cached_outputs=None, input_binding=None, load_inputs=None, tags=None, facet_from=None, facet_schema_version=None, description=None, **extra)
¶
Attach metadata to a function without changing execution behavior.
This decorator lets you attach defaults such as outputs or tags to a
function. Tracker.run and ScenarioContext.run read this metadata.
Callable values are resolved at runtime with a StepContext.
require_runtime_kwargs(*names)
¶
Enforce the presence of required runtime_kwargs when a step is executed.
Use this decorator to catch missing runtime-only inputs early, before
executing a run. It validates that each required name is present in the
runtime_kwargs passed to Tracker.run or ScenarioContext.run.
scenario(name, tracker=None, *, enabled=True, **kwargs)
¶
Context manager for grouping multiple execution steps into a semantic scenario.
A scenario creates a parent "header" run that aggregates metadata, lineage, and artifacts from all nested steps. This enables multi-stage simulations to be tracked as a single unit while maintaining fine-grained provenance for each internal component.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Unique identifier or name for the scenario header. |
required |
tracker
|
Optional[Tracker]
|
The Tracker instance to use for persistence. Defaults to the active global tracker. |
None
|
enabled
|
bool
|
If False, returns a no-op context that executes functions without recording provenance, preserving the Scenario API ergonomics for debugging. |
True
|
**kwargs
|
Any
|
Additional metadata (e.g., tags, config) forwarded to the header run. |
{}
|
Yields:
| Type | Description |
|---|---|
ScenarioContext
|
An active scenario context with an internal Coupler for artifact chaining. |
single_step_scenario(name, step_name=None, tracker=None, **kwargs)
¶
Convenience wrapper that exposes a single step scenario.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name of the scenario header. |
required |
step_name
|
Optional[str]
|
Name for the single step; defaults to |
None
|
tracker
|
Optional[Tracker]
|
Tracker to execute the scenario; defaults to the active tracker. |
None
|
**kwargs
|
Any
|
Arguments forwarded to |
{}
|
Yields:
| Type | Description |
|---|---|
ScenarioContext
|
Scenario context manager for the single step. |
current_tracker()
¶
Retrieves the active Tracker instance from the global context.
If no run is active, this function falls back to the default tracker (if set
via consist.use_tracker or consist.set_current_tracker).
Returns:
| Type | Description |
|---|---|
Tracker
|
The |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If no |
current_run()
¶
Return the active Run record if one is in progress, otherwise None.
A Run record contains metadata about the current execution, such as its
unique ID, model name, and start time.
current_consist()
¶
Return the active ConsistRecord if one is in progress, otherwise None.
The ConsistRecord is the internal state object that tracks the active
run's inputs, outputs, and metadata during execution.
output_dir(namespace=None)
¶
Resolve the managed output directory for the active run context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
namespace
|
Optional[str]
|
Optional relative subdirectory under the active run's managed output directory. |
None
|
Returns:
| Type | Description |
|---|---|
Path
|
Absolute path to the managed output directory. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If called outside an active run context. |
output_path(key, ext='parquet')
¶
Resolve a deterministic managed output path for the active run context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Artifact key used as the output filename stem. |
required |
ext
|
str
|
File extension to append to the output filename. |
"parquet"
|
Returns:
| Type | Description |
|---|---|
Path
|
Absolute managed output path for the current run. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If called outside an active run context. |
cached_artifacts(direction='output')
¶
cached_output(key=None)
¶
Fetch a hydrated cached output artifact for the active run.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
Optional[str]
|
Specific artifact key to look up; defaults to the first available artifact. |
None
|
Returns:
| Type | Description |
|---|---|
Optional[Artifact]
|
Cached artifact instance or |
get_run_result(run_id, *, keys=None, validate='lazy', tracker=None)
¶
Retrieve a historical run as a RunResult with selected outputs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
run_id
|
str
|
Identifier of the historical run. |
required |
keys
|
Optional[Iterable[str]]
|
Optional output keys to include. If omitted, includes all outputs. |
None
|
validate
|
(lazy, strict, none)
|
Output validation policy forwarded to |
"lazy"
|
tracker
|
Optional[Tracker]
|
Tracker instance to query. If omitted, resolves the active/default tracker. |
None
|
Returns:
| Type | Description |
|---|---|
RunResult
|
Historical run metadata plus selected outputs. |
get_artifact(run_id, key=None, key_contains=None, direction='output')
¶
Retrieve a single artifact from a historical run.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
run_id
|
str
|
Identifier of the run that produced the artifact. |
required |
key
|
Optional[str]
|
Exact artifact key to match. |
None
|
key_contains
|
Optional[str]
|
Substring filter for artifact keys. |
None
|
direction
|
str
|
Either "input" or "output". |
"output"
|
Returns:
| Type | Description |
|---|---|
Optional[Artifact]
|
Matching artifact or |
register_artifact_facet_parser(prefix, parser_fn, *, tracker=None)
¶
Register a key-prefix parser for deriving artifact facets at log time.
The parser is invoked when logging an artifact without an explicit facet=.
log_artifact(path, key=None, direction='output', schema=None, driver=None, content_hash=None, force_hash_override=False, validate_content_hash=False, reuse_if_unchanged=False, reuse_scope='same_uri', facet=None, facet_schema_version=None, facet_index=False, *, enabled=True, **meta)
¶
Logs an artifact (file or data reference) to the currently active run.
This function is a convenient proxy to consist.core.tracker.Tracker.log_artifact.
It automatically links the artifact to the current run context, handles path
virtualization, and performs lineage discovery.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
ArtifactRef
|
A file path (str/Path) or an existing |
required |
key
|
Optional[str]
|
A semantic, human-readable name for the artifact (e.g., "households").
Required if |
None
|
direction
|
str
|
Specifies whether the artifact is an "input" or "output" for the current run. Defaults to "output". |
"output"
|
schema
|
Optional[Type[SQLModel]]
|
An optional SQLModel class that defines the expected schema for the artifact's data. Its name will be stored in artifact metadata. |
None
|
driver
|
Optional[str]
|
Explicitly specify the driver (e.g., 'h5_table'). If None, the driver is inferred from the file extension. |
None
|
content_hash
|
Optional[str]
|
Precomputed content hash to use for the artifact instead of hashing the path on disk. |
None
|
force_hash_override
|
bool
|
If True, overwrite an existing artifact hash when it differs from
|
False
|
validate_content_hash
|
bool
|
If True, verify |
False
|
reuse_if_unchanged
|
bool
|
Deprecated for outputs. Consist always creates a fresh output artifact row;
identical bytes are deduplicated via |
False
|
reuse_scope
|
(same_uri, any_uri)
|
Deprecated for outputs. |
"same_uri"
|
facet
|
Optional[Any]
|
Optional artifact-level facet payload (dict or Pydantic model). |
None
|
facet_schema_version
|
Optional[Union[str, int]]
|
Optional schema version for artifact facet compatibility. |
None
|
facet_index
|
bool
|
If True, flatten scalar facet values for fast artifact querying. |
False
|
enabled
|
bool
|
If False, returns a noop artifact object without requiring an active run. |
True
|
**meta
|
Any
|
Additional key-value pairs to store in the artifact's flexible |
{}
|
Returns:
| Type | Description |
|---|---|
Artifact
|
The created or updated |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If called outside an active run context. |
ValueError
|
If |
log_artifacts(outputs, *, direction='output', driver=None, metadata_by_key=None, facets_by_key=None, facet_schema_versions_by_key=None, facet_index=False, reuse_if_unchanged=False, reuse_scope='same_uri', enabled=True, **shared_meta)
¶
Log multiple artifacts in a single call for efficiency.
This function is a convenient proxy to consist.core.tracker.Tracker.log_artifacts.
It logs a batch of related artifacts with optional per-key metadata customization.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
outputs
|
Mapping[str, ArtifactRef]
|
Mapping of key -> path/Artifact to log. Keys must be strings and explicitly chosen by the caller (not inferred from filenames). |
required |
direction
|
str
|
"input" or "output" for the current run context. |
"output"
|
driver
|
Optional[str]
|
Explicitly specify driver for all artifacts. If None, inferred from file extension. |
None
|
metadata_by_key
|
Optional[Mapping[str, Dict[str, Any]]]
|
Per-key metadata overrides applied on top of shared metadata. |
None
|
facets_by_key
|
Optional[Mapping[str, Any]]
|
Per-key artifact facet payloads. |
None
|
facet_schema_versions_by_key
|
Optional[Mapping[str, Union[str, int]]]
|
Optional per-key schema versions for artifact facets. |
None
|
facet_index
|
bool
|
Whether to index scalar artifact facet fields in |
False
|
reuse_if_unchanged
|
bool
|
Deprecated for outputs. Batch output logging still creates a fresh artifact
row per call; identical bytes are deduplicated via |
False
|
reuse_scope
|
(same_uri, any_uri)
|
Deprecated for outputs. |
"same_uri"
|
enabled
|
bool
|
If False, returns noop artifact objects without requiring an active run. |
True
|
**shared_meta
|
Any
|
Metadata key-value pairs applied to ALL logged artifacts. |
{}
|
Returns:
| Type | Description |
|---|---|
Mapping[str, ArtifactLike]
|
Mapping of key -> logged Artifact-like objects. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If called outside an active run context. |
ValueError
|
If metadata_by_key contains keys not in outputs, or if any value is None. |
TypeError
|
If mapping keys are not strings. |
Examples:
Log multiple outputs with shared metadata:
artifacts = consist.log_artifacts(
{
"persons": "results/persons.parquet",
"households": "results/households.parquet",
"jobs": "results/jobs.parquet"
},
year=2030,
scenario="base"
)
# All three artifacts get year=2030 and scenario="base"
Mix shared and per-key metadata:
artifacts = consist.log_artifacts(
{
"persons": "output/persons.parquet",
"households": "output/households.parquet"
},
metadata_by_key={
"households": {"role": "primary_unit", "weight": 1.0}
},
dataset_version="v2",
simulation_id="run_001"
)
# persons gets: dataset_version="v2", simulation_id="run_001"
# households gets: dataset_version="v2", simulation_id="run_001",
# role="primary_unit", weight=1.0
log_input(path, key=None, *, schema=None, driver=None, content_hash=None, force_hash_override=False, validate_content_hash=False, facet=None, facet_schema_version=None, facet_index=False, enabled=True, **meta)
¶
Log an input artifact to the active run.
An input artifact represents a data source that the current run reads from. Logging it creates a lineage link, allowing Consist to track which versions of data produced which results.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
ArtifactRef
|
File path (str/Path) or an existing |
required |
key
|
Optional[str]
|
Semantic name for the artifact (e.g. "raw_households"). Required if |
None
|
schema
|
Optional[Type[SQLModel]]
|
Optional SQLModel class defining the data structure. |
None
|
driver
|
Optional[str]
|
Explicit format driver (e.g. "parquet"). Inferred from extension if None. |
None
|
content_hash
|
Optional[str]
|
Precomputed hash to avoid re-hashing large files. |
None
|
force_hash_override
|
bool
|
Overwrite existing hash in the database if different from |
False
|
validate_content_hash
|
bool
|
Re-hash the file on disk to ensure it matches the provided |
False
|
facet
|
Optional[Any]
|
Optional artifact-level facet payload. |
None
|
facet_schema_version
|
Optional[Union[str, int]]
|
Optional facet schema version. |
None
|
facet_index
|
bool
|
Whether to index scalar facet fields for querying. |
False
|
enabled
|
bool
|
If False, returns a dummy artifact object for disconnected execution. |
True
|
**meta
|
Any
|
Additional metadata fields to store with the artifact. |
{}
|
Returns:
| Type | Description |
|---|---|
ArtifactLike
|
The logged artifact reference. |
log_output(path, key=None, *, schema=None, driver=None, content_hash=None, force_hash_override=False, validate_content_hash=False, reuse_if_unchanged=False, reuse_scope='same_uri', facet=None, facet_schema_version=None, facet_index=False, enabled=True, **meta)
¶
Log an output artifact produced by the current run.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
ArtifactRef
|
File path (str/Path) or an existing |
required |
key
|
Optional[str]
|
Semantic name for the artifact (e.g. "processed_results"). Required if |
None
|
schema
|
Optional[Type[SQLModel]]
|
Optional SQLModel class defining the data structure. |
None
|
driver
|
Optional[str]
|
Explicit format driver (e.g. "parquet"). Inferred from extension if None. |
None
|
content_hash
|
Optional[str]
|
Precomputed hash to avoid re-hashing large files. |
None
|
force_hash_override
|
bool
|
Overwrite existing hash in the database if different from |
False
|
validate_content_hash
|
bool
|
Re-hash the file on disk to ensure it matches the provided |
False
|
reuse_if_unchanged
|
bool
|
Deprecated for outputs. A fresh output artifact row is always created; identical
bytes share |
False
|
reuse_scope
|
(same_uri, any_uri)
|
Deprecated for outputs. |
"same_uri"
|
facet
|
Optional[Any]
|
Optional artifact-level facet payload. |
None
|
facet_schema_version
|
Optional[Union[str, int]]
|
Optional facet schema version. |
None
|
facet_index
|
bool
|
Whether to index scalar facet fields for querying. |
False
|
enabled
|
bool
|
If False, returns a dummy artifact object. |
True
|
**meta
|
Any
|
Additional metadata fields to store. |
{}
|
Returns:
| Type | Description |
|---|---|
ArtifactLike
|
The logged artifact reference. |
log_dataframe(df, key, schema=None, direction='output', tracker=None, path=None, driver=None, meta=None, **to_file_kwargs)
¶
Serialize a DataFrame, log it as an artifact, and trigger optional ingestion.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
Data to persist. |
required |
key
|
str
|
Logical artifact key. |
required |
schema
|
Optional[Type[SQLModel]]
|
Schema used for ingestion, if provided. |
None
|
direction
|
str
|
Artifact direction relative to the run. |
"output"
|
tracker
|
Optional[Tracker]
|
Tracker instance to use; defaults to the active tracker. If None and no active run context exists, raises RuntimeError. |
None
|
path
|
Optional[Union[str, Path]]
|
Output path; defaults to |
None
|
driver
|
Optional[str]
|
File format driver (e.g., "parquet" or "csv"). |
None
|
meta
|
Optional[Dict[str, Any]]
|
Additional metadata for the artifact. |
None
|
**to_file_kwargs
|
Any
|
Keyword arguments forwarded to |
{}
|
Returns:
| Type | Description |
|---|---|
Artifact
|
The artifact logged for the written dataset. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the requested driver is unsupported. |
log_meta(**kwargs)
¶
Updates the active run's metadata with the provided key-value pairs.
This function is a convenient proxy to consist.core.tracker.Tracker.log_meta.
It allows users to log additional information about the current run, such as
performance metrics, experimental parameters, or tags, directly to the run's
metadata. This information is then persisted to both the JSON log and the
DuckDB database.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**kwargs
|
Any
|
Arbitrary key-value pairs to merge into the |
{}
|
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If called when no |
ingest(artifact, data=None, schema=None, run=None)
¶
Ingests data associated with an Artifact into the active run's database.
This function is a convenient proxy to consist.core.tracker.Tracker.ingest.
It materializes data into the DuckDB, leveraging dlt for efficient loading
and injecting provenance information. This is part of the "Hot Data Strategy".
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
artifact
|
Artifact
|
The artifact object representing the data being ingested. |
required |
data
|
Optional[Union[Iterable[Dict[str, Any]], Any]]
|
The data to ingest. Can be an iterable of dictionaries (rows), a file path
(str or Path), a Pandas DataFrame, or any other data type that |
None
|
schema
|
Optional[Type[SQLModel]]
|
An optional SQLModel class that defines the expected schema for the ingested data.
If provided, |
None
|
run
|
Optional[Run]
|
The |
None
|
Returns:
| Type | Description |
|---|---|
Any
|
The result information from the underlying |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If no database is configured for the active |
Exception
|
Any exception raised by the underlying |
register_views(*models)
¶
find_run(tracker=None, **filters)
¶
Convenience proxy for Tracker.find_run.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tracker
|
Optional[Tracker]
|
Tracker instance to query; defaults to the active tracker. |
None
|
**filters
|
Any
|
Filter values forwarded to |
{}
|
Returns:
| Type | Description |
|---|---|
Optional[Run]
|
Matching run or |
find_runs(tracker=None, **filters)
¶
Convenience proxy for Tracker.find_runs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tracker
|
Optional[Tracker]
|
Tracker instance to query; defaults to the active tracker. |
None
|
**filters
|
Any
|
Filter values forwarded to |
{}
|
Returns:
| Type | Description |
|---|---|
Union[list[Run], Dict[Hashable, Run]]
|
Results returned by |
find_latest_run(tracker=None, **filters)
¶
Convenience proxy for Tracker.find_latest_run.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tracker
|
Optional[Tracker]
|
Tracker instance to query; defaults to the active tracker. |
None
|
**filters
|
Any
|
Filter values forwarded to |
{}
|
Returns:
| Type | Description |
|---|---|
Run
|
The latest matching run. |
run_set(tracker=None, label=None, **filters)
¶
Convenience proxy for Tracker.run_set.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tracker
|
Optional[Tracker]
|
Tracker instance to query; defaults to the active tracker. |
None
|
label
|
Optional[str]
|
Optional label attached to the returned RunSet. |
None
|
**filters
|
Any
|
Filter values forwarded to |
{}
|
Returns:
| Type | Description |
|---|---|
RunSet
|
A tracker-backed RunSet for grouping/alignment workflows. |
Notes
This convenience helper resolves a tracker from context and forwards to
Tracker.run_set(...).
db_session(tracker=None)
¶
Provide a SQLModel Session connected to the tracker's database.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tracker
|
Optional[Tracker]
|
Tracker instance supplying the engine; defaults to active tracker. |
None
|
Yields:
| Type | Description |
|---|---|
Session
|
SQLModel session bound to the tracker engine. |
run_query(query, tracker=None)
¶
Execute a SQLModel/SQLAlchemy query via the tracker engine.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
Executable
|
Query object ( |
required |
tracker
|
Optional[Tracker]
|
Tracker instance supplying the engine; defaults to the active tracker. |
None
|
Returns:
| Type | Description |
|---|---|
list
|
Results of the executed query. |
config_run_query(table, *, link_table, table_name=None, columns=None, where=None, join_on=None)
¶
Build a query joining a config cache table to its run-link table.
This is a convenience helper for config adapter tables that are deduplicated by content hash and linked to runs via a separate link table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
Type[SQLModel]
|
Config cache table to query (e.g., ActivitySimCoefficientsCache). |
required |
link_table
|
Type[SQLModel]
|
Link table storing run_id/content_hash (e.g., ActivitySimConfigIngestRunLink). |
required |
table_name
|
Optional[str]
|
Optional filter for link_table.table_name. Defaults to table.tablename if link_table has a table_name column. |
None
|
columns
|
Optional[Iterable[Any]]
|
Columns to select. Defaults to (link_table.run_id, table). |
None
|
where
|
Optional[Union[Iterable[Any], Any]]
|
Optional filter clauses to apply to the query. |
None
|
join_on
|
Optional[Iterable[str]]
|
Column names to join on. Defaults to content_hash and file_name when available. |
None
|
Returns:
| Type | Description |
|---|---|
Executable
|
SQL query joining the config table to runs. |
config_run_rows(table, *, link_table, table_name=None, columns=None, where=None, join_on=None, tracker=None)
¶
Execute a config-to-run join query and return the results as a list of rows.
This is a high-level wrapper around config_run_query and run_query. It is
frequently used by integration adapters (like ActivitySim or BEAM) to retrieve
configuration parameters that were active during a specific run.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
Type[SQLModel]
|
The configuration cache table to query. |
required |
link_table
|
Type[SQLModel]
|
The table that links configuration entries to specific run IDs. |
required |
table_name
|
Optional[str]
|
Optional filter for the link table's |
None
|
columns
|
Optional[Iterable]
|
Specific columns to select. Defaults to the run ID and the config record. |
None
|
where
|
Optional[Clause]
|
Additional SQLAlchemy filter clauses. |
None
|
join_on
|
Optional[Iterable[str]]
|
Column names to join on (e.g. |
None
|
tracker
|
Optional[Tracker]
|
The tracker to use for database access. |
None
|
Returns:
| Type | Description |
|---|---|
list
|
The results of the query. |
pivot_facets(*, namespace, keys, value_column='value_num', value_columns=None, label_prefix='', label_map=None, run_id_label='run_id', table=RunConfigKV)
¶
Build a pivoted facet subquery keyed by run_id.
This is a convenience helper for turning flattened config facets
(run_config_kv) into a wide table suitable for joins.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
namespace
|
Optional[str]
|
Facet namespace to filter by (typically the model name). If |
required |
keys
|
Iterable[str]
|
Facet keys to pivot into columns. |
required |
value_column
|
str
|
Default value column to read from. Must be one of:
|
"value_num"
|
value_columns
|
Optional[Mapping[str, str]]
|
Optional per-key override of |
None
|
label_prefix
|
str
|
Optional prefix for pivoted column labels. |
""
|
label_map
|
Optional[Mapping[str, str]]
|
Optional per-key label overrides. |
None
|
run_id_label
|
str
|
Label to use for the run id column in the returned subquery. |
"run_id"
|
table
|
Type[RunConfigKV]
|
Table/model providing the facet KV rows. |
RunConfigKV
|
Returns:
| Type | Description |
|---|---|
Any
|
A SQLAlchemy subquery with columns: |
capture_outputs(directory, pattern='*', recursive=False)
¶
Context manager to automatically capture and log new or modified files in a directory within the current active run context.
This function is a convenient proxy to consist.core.tracker.Tracker.capture_outputs.
It watches a specified directory for any file changes (creations or modifications)
that occur within its with block. These changes are then automatically logged
as output artifacts of the current Consist run.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
directory
|
Union[str, Path]
|
The path to the directory to monitor for new or modified files. |
required |
pattern
|
str
|
A glob pattern (e.g., ".csv", "data_.parquet") to filter which files are captured within the specified directory. Defaults to all files. |
"*"
|
recursive
|
bool
|
If True, the capture will recursively scan subdirectories within |
False
|
Yields:
| Type | Description |
|---|---|
OutputCapture
|
An |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If |
load(artifact, tracker=None, *, db_fallback='inputs-only', **kwargs)
¶
load(
artifact: ZarrArtifact,
tracker: Optional[Tracker] = None,
*,
db_fallback: str = "inputs-only",
**kwargs: Any,
) -> XarrayDatasetType
load(
artifact: NetCdfArtifact,
tracker: Optional[Tracker] = None,
*,
db_fallback: str = "inputs-only",
**kwargs: Any,
) -> XarrayDatasetType
load(
artifact: OpenMatrixArtifact,
tracker: Optional[Tracker] = None,
*,
db_fallback: str = "inputs-only",
**kwargs: Any,
) -> XarrayDatasetType
load(
artifact: SpatialArtifact,
tracker: Optional[Tracker] = None,
*,
db_fallback: str = "inputs-only",
**kwargs: Any,
) -> GeoDataFrameType
load(
artifact: HdfStoreArtifact,
tracker: Optional[Tracker] = None,
*,
db_fallback: str = "inputs-only",
**kwargs: Any,
) -> pd.HDFStore
load(
artifact: DataFrameArtifact,
tracker: Optional[Tracker] = None,
*,
db_fallback: str = "inputs-only",
**kwargs: Any,
) -> duckdb.DuckDBPyRelation
load(
artifact: JsonArtifact,
tracker: Optional[Tracker] = None,
*,
db_fallback: str = "inputs-only",
**kwargs: Any,
) -> duckdb.DuckDBPyRelation
load(
artifact: TabularArtifact,
tracker: Optional[Tracker] = None,
*,
db_fallback: str = "inputs-only",
**kwargs: Any,
) -> duckdb.DuckDBPyRelation
Smart loader that retrieves data for an artifact from the best available source.
This function attempts to load the data associated with an Artifact object.
It prioritizes loading from disk (raw format) if the file exists. If the file
is missing but the artifact is marked as ingested, it can optionally recover the
data from the Consist DuckDB database ("Ghost Mode"). By default, DB recovery is
only allowed when the artifact is an input to an active, non-cached run.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
artifact
|
Artifact
|
The Consist |
required |
tracker
|
Optional[Tracker]
|
The |
None
|
db_fallback
|
str
|
Controls when the loader is allowed to fall back to DuckDB ("Ghost Mode") when the file is missing but the artifact is marked as ingested.
|
"inputs-only"
|
**kwargs
|
Any
|
Additional keyword arguments to pass to the underlying data loader function
(e.g., |
{}
|
Returns:
| Type | Description |
|---|---|
LoadResult
|
The loaded data, typically a DuckDB Relation for tabular data, an xarray
Dataset for array formats, or another data object depending on the artifact's
|
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If no |
FileNotFoundError
|
If the artifact's data cannot be found on disk or recovered from the database. |
load_df(artifact, tracker=None, *, db_fallback='inputs-only', close=True, **kwargs)
¶
Load a tabular artifact and return a pandas DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
artifact
|
ArtifactLike
|
Artifact to load. |
required |
tracker
|
Optional[Tracker]
|
Tracker to use for resolving paths or DB fallback. |
None
|
db_fallback
|
str
|
Controls when DB recovery is allowed for ingested artifacts. |
"inputs-only"
|
close
|
bool
|
Whether to close the underlying DuckDB connection after materialization when the load returns a Relation. |
True
|
**kwargs
|
Any
|
Additional loader options. |
{}
|
load_relation(artifact, tracker=None, *, db_fallback='inputs-only', **kwargs)
¶
Context manager that yields a DuckDB Relation and ensures the underlying connection is closed on exit.
to_df(relation, *, close=True)
¶
Convert a DuckDB Relation to a pandas DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
relation
|
DuckDBPyRelation
|
Relation to materialize into a DataFrame. |
required |
close
|
bool
|
Whether to close the underlying DuckDB connection after materialization.
Use |
True
|
active_relation_count()
¶
Return the number of active DuckDB relations tracked by Consist.
set_current_tracker(tracker)
¶
Set the default (fallback) tracker used by Consist entrypoints.
Entrypoints like consist.run(), consist.start_run(), and consist.scenario()
use this tracker if they are called outside of an active run context and no
explicit tracker= argument is provided.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tracker
|
Optional[Tracker]
|
The tracker instance to set as the default, or |
required |
Returns:
| Type | Description |
|---|---|
Optional[Tracker]
|
The previously configured default tracker, if any. |
noop_scenario(name, **kwargs)
¶
Creates a scenario context that executes without provenance tracking.
This is useful for debugging or running simulations where you want the ergonomics of the Consist scenario API (like the Coupler and RunResult) but do not want to record any metadata or artifacts to the database.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name of the scenario (for display/logging purposes). |
required |
**kwargs
|
Any
|
Additional arguments forwarded to the noop context. |
{}
|
is_dataframe_artifact(artifact)
¶
Type guard: narrow artifact to tabular types (parquet, csv, h5_table).
Use this to enable type-safe loading and IDE autocomplete:
if is_dataframe_artifact(artifact):
rel = load(artifact) # Type checker knows return is Relation
df.head() # IDE autocomplete works!
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
artifact
|
ArtifactLike
|
Artifact to check. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if artifact driver is parquet, csv, or h5_table. |
is_tabular_artifact(artifact)
¶
Type guard: narrow artifact to any tabular format (parquet, csv, h5_table, json).
Note: This is broader than is_dataframe_artifact(), so load() returns
a Relation for tabular artifacts. Use load_df() for a pandas escape hatch.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
artifact
|
ArtifactLike
|
Artifact to check. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if artifact driver produces tabular data. |
is_json_artifact(artifact)
¶
Type guard: narrow artifact to JSON format.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
artifact
|
ArtifactLike
|
Artifact to check. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if artifact driver is json. |
is_zarr_artifact(artifact)
¶
Type guard: narrow artifact to Zarr format.
Use this when you know an artifact should be Zarr and want type-safe loading:
if is_zarr_artifact(artifact):
ds = load(artifact) # Type checker knows return is xarray.Dataset
ds.dims # IDE autocomplete works!
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
artifact
|
ArtifactLike
|
Artifact to check. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if artifact driver is zarr. |