Usage Guide¶
Consist provides flexible patterns for tracking provenance in scientific workflows. This guide walks you through the main usage patterns, from simple single-step runs to complex multi-year simulations. Each section is written to help:
- Developers integrating Consist into a simulation tool
- Practitioners running tools and wanting clearer inputs/outputs
- Researchers managing multi-stage pipelines and reproducibility
New to Consist? Start with the quickstart tutorial, then work through the examples below in order.
If you are building a reusable domain integration, see Building a Domain Tracker for the recommended wrapper architecture.
Contents¶
- Choosing Your Pattern
- Pattern 1: Single-Step Runs (
run()) - Pattern 2: Multi-Step Workflows (
scenario())- Understanding the Coupler
- Simple Example: Two-Step Workflow
- Declaring Inputs: Disk Paths vs Explicit Step Links
- Decorator Defaults, Templates, and Schema Introspection
- Passing Data Between Steps with the Coupler
- Output Validation
- Selective Output Collection with
collect_by_keys() - Example: Parameter Sweep in a Scenario
- Pattern 3: Container Integration
- Advanced Patterns
- When Does Code Execute? Understanding
sc.run()vssc.trace() - Motivation: When Caching Saves Time
- Querying Results
Choosing Your Pattern¶
Choose based on what you're building and how much structure you need:
| Your Workflow | Pattern | Why |
|---|---|---|
| Single data processing step (clean, transform, aggregate) | run |
Simple: caches the entire function call with low overhead. Use for self-contained operations. |
| Multi-step workflow (preprocessing → simulation → analysis) | scenario |
Groups related steps, shares state via coupler, per-step caching. Use when steps have dependencies or shared configuration. |
| Existing tool/model (subprocess, legacy code, container) | container or depends_on |
Wraps external executables, tracks container digest as cache key. Use for black-box tools. |
| Parameter sweep / sensitivity analysis | scenario + loop |
Run the same step with different configs, compare results. |
| Multi-year simulation | scenario + loop |
Runs in years, each year caches independently, all years share scenario context. |
Pattern 1: Single-Step Runs (run())¶
Use run() when you have a self-contained operation: data cleaning, transformation, aggregation, or any callable that takes inputs and produces outputs. Consist caches the entire function call based on code version + config + input data.
Recommended default: use ExecutionOptions(input_binding="paths"). That
keeps the function boundary honest: inputs={...} declares identity and
lineage, and the callable receives those named inputs as local Path objects.
Use input_binding="loaded" when you want Consist to hydrate tables or objects
before calling the function. Use input_binding="none" when inputs are
identity-only and you do not want automatic binding.
Choosing an Input Binding Mode¶
| Mode | Function receives | Best for | Main tradeoff |
|---|---|---|---|
"paths" |
local Path objects |
explicit filesystem workflows, subprocesses, existing code that already reads files | requires a usable local path |
"loaded" |
loaded DataFrame / Python objects |
tabular analysis, ingested artifacts, DB-backed hydration | hides the I/O boundary |
"none" |
no automatic binding | advanced wrappers, identity-only dependencies | you must supply args some other way |
runtime_kwargs still exists, but it should usually mean "runtime-only values
that are not the declared named inputs." It is no longer the recommended way to
pass paths into a path-based step.
When to use:
- Simple data transformations (filter, aggregate, merge)
- Expensive computations that don't depend on other runs
- One-off analyses
- You want the simplest mental model
When NOT to use:
- Multi-step workflows with dependencies between steps (use
scenario()instead) - Existing tools that write to directories (use
depends_onorcontainer)
Simple Example: Data Cleaning¶
from pathlib import Path
import pandas as pd
RAW = Path("raw.parquet")
CLEANED = Path("cleaned.parquet")
def clean_data(raw_path: Path, threshold: float = 0.5) -> None:
df = pd.read_parquet(raw_path)
df[df["value"] > threshold].to_parquet(CLEANED)
clean_data(RAW, threshold=0.5)
cleaned_df = pd.read_parquet(CLEANED)
print(cleaned_df.head())
This is plain Python and it works, but there is no cache identity, no run record, and no answer to "which exact code/config/input produced this file?"
from pathlib import Path
import pandas as pd
from consist import ExecutionOptions, Tracker
tracker = Tracker(run_dir="./runs", db_path="./provenance.duckdb")
def clean_data(raw_path: Path, threshold: float = 0.5) -> dict[str, Path]:
df = pd.read_parquet(raw_path)
out_path = Path("./cleaned.parquet")
df[df["value"] > threshold].to_parquet(out_path)
return {"cleaned": out_path}
result = tracker.run(
fn=clean_data,
inputs={"raw_path": Path("raw.parquet")},
config={"threshold": 0.5},
outputs=["cleaned"],
execution_options=ExecutionOptions(input_binding="paths"),
)
cleaned_artifact = result.outputs["cleaned"]
cleaned_df = pd.read_parquet(cleaned_artifact.path)
print(cleaned_artifact.path)
The function stays ordinary and testable. Consist adds the cache key,
lineage, and queryable artifact record around the call, while
input_binding="paths" keeps the file boundary explicit.
from pathlib import Path
import pandas as pd
from consist import ExecutionOptions, Tracker
tracker = Tracker(run_dir="./runs", db_path="./provenance.duckdb")
def clean_data(raw: pd.DataFrame, threshold: float = 0.5) -> dict[str, Path]:
out_path = Path("./cleaned.parquet")
raw[raw["value"] > threshold].to_parquet(out_path)
return {"cleaned": out_path}
result = tracker.run(
fn=clean_data,
inputs={"raw": Path("raw.parquet")},
config={"threshold": 0.5},
outputs=["cleaned"],
execution_options=ExecutionOptions(input_binding="loaded"),
)
cleaned_artifact = result.outputs["cleaned"]
cleaned_df = pd.read_parquet(cleaned_artifact.path)
print(cleaned_artifact.path)
This is convenient for tabular work: Consist loads the named input before the function executes. Prefer it when you want hydrated objects rather than explicit file boundaries.
If you run it again with the same inputs and config, you should get a cache hit and no re-execution.
Preferred run configuration style (options objects):
from consist import CacheOptions, ExecutionOptions, OutputPolicyOptions
result = consist.run(
fn=clean_data,
inputs={"raw": Path("raw.csv")},
config={"threshold": 0.5},
outputs=["cleaned"],
cache_options=CacheOptions(cache_mode="reuse", cache_hydration="inputs-missing"),
output_policy=OutputPolicyOptions(output_missing="error"),
execution_options=ExecutionOptions(input_binding="loaded"),
)
To scope code hashing to the callable instead of full-repo Git state:
from consist import CacheOptions
result = consist.run(
fn=clean_data,
inputs={"raw": Path("raw.csv")},
outputs=["cleaned"],
cache_options=CacheOptions(
code_identity="callable_module",
code_identity_extra_deps=["shared/helpers.py"],
),
)
This reduces cache misses when unrelated files elsewhere in the repository change.
Legacy run-policy kwargs are no longer supported on run(...) APIs. Use
cache_options=..., output_policy=..., and execution_options=....
For file outputs, prefer returning dict[str, Path] and declaring
outputs=[...] when you control the function body. Use managed helpers like
consist.output_path(...),
_consist_ctx.output_path(...), or _consist_ctx.output_dir(...) when you need
Consist to allocate paths for you or when wrapping tools that do not return
their outputs directly.
Alternative: use injected context and managed output paths
Use this when Consist should allocate the output location for you, or when you are wrapping a file-writing tool that returns `None`.from consist import ExecutionOptions
from pathlib import Path
import pandas as pd
def clean_data(raw_file: Path, _consist_ctx) -> None:
df = pd.read_csv(raw_file)
out_path = _consist_ctx.output_path("cleaned")
df.to_parquet(out_path)
_consist_ctx.log_output(out_path, key="cleaned")
with use_tracker(tracker):
result = consist.run(
fn=clean_data,
inputs={"raw_file": Path("raw.csv")}, # (1)!
execution_options=ExecutionOptions(
input_binding="paths",
inject_context=True,
),
)
Example with Config¶
Use Pydantic models for structured configs:
import consist
from consist import Tracker, use_tracker
from pydantic import BaseModel
from pathlib import Path
import pandas as pd
class CleaningConfig(BaseModel):
threshold: float = 0.5
remove_outliers: bool = True
tracker = Tracker(run_dir="./runs", db_path="./provenance.duckdb")
def clean_data(raw: pd.DataFrame, config: CleaningConfig) -> pd.DataFrame:
df = raw
df = df[df["value"] >= config.threshold]
if config.remove_outliers:
df = df[df["value"] < df["value"].quantile(0.95)]
return df
with use_tracker(tracker):
result = consist.run(
fn=clean_data,
inputs={"raw": Path("raw.csv")},
config=CleaningConfig(threshold=0.5, remove_outliers=True),
outputs=["cleaned"],
)
Each distinct config → separate cache entries. Change the threshold? Only that run re-executes.
Wrapping Legacy or Black-Box Tools¶
If you have existing code that writes files to a directory, use the injected run context to capture outputs. _consist_ctx.output_dir(...) avoids manual mkdir/path joining and keeps files in the managed artifact location.
from pathlib import Path
from consist import ExecutionOptions
def run_legacy_model(upstream, _consist_ctx) -> None:
import legacy_model
output_dir = _consist_ctx.output_dir("legacy_outputs")
with _consist_ctx.capture_outputs(output_dir, pattern="*.csv"):
legacy_model.run(upstream, output_dir=output_dir)
with use_tracker(tracker):
result = consist.run(
fn=run_legacy_model,
inputs={"upstream": Path("input.csv")},
depends_on=[Path("config.yaml"), Path("parameters.json")], # (1)!
execution_options=ExecutionOptions(
input_binding="paths",
inject_context=True,
),
)
- Hash these files too so config changes invalidate the cache.
input_binding="paths"passes raw paths to the legacy tool while still keeping those inputs in cache identity and lineage.
Captured outputs are keyed by filename stem (for example, results.csv -> results).
Alternative: capture a fixed output directory
If the tool always writes to a known folder and returns `None`, you can capture it directly:from consist import ExecutionOptions
with use_tracker(tracker):
def run_legacy_model(upstream) -> None:
import legacy_model
legacy_model.run(upstream, output_dir="outputs")
result = consist.run(
fn=run_legacy_model,
inputs={"upstream": Path("input.csv")}, # (1)!
depends_on=[Path("config.yaml")],
execution_options=ExecutionOptions(
input_binding="paths",
),
capture_dir=Path("outputs"),
capture_pattern="*.csv",
)
Alternative: auto-load tabular inputs into function arguments
Auto-loading is convenient for short scripts and notebook-scale analysis: When `inputs` is a mapping and `input_binding="loaded"`, Consist hydrates those artifacts into function arguments for you. This is shorter, but it also hides the file boundary. Prefer the explicit `Path`-based pattern above when you want the workflow wiring to stay obvious.The depends_on files are hashed as part of the cache key, so changing config.yaml invalidates the cache.
Pattern 2: Multi-Step Workflows (scenario())¶
Use scenario() when you have multiple interdependent steps that share state or configuration. Scenarios group steps into a coherent unit (a "run scenario"), while each step caches independently.
Recommended default: keep step links explicit. Have each step return named
artifact paths, then pass those outputs downstream with consist.ref(...) or
consist.refs(...). Reach for the coupler when you need scenario-scoped runtime
state, optional branching, or trace-style orchestration.
Compiled bindings for orchestrators¶
When a planner or external orchestrator has already resolved a step's binding
decision, pass that result to sc.run(...) as BindingResult(...). Treat the
binding object as an execution envelope, not a planning API: the planner lives
outside Consist, and Consist executes the already-resolved binding.
Keep the explicit consist.ref(...) / consist.refs(...) path as the
recommended default for direct step-to-step workflow code. Use BindingResult
when the binding plan is coming from a separate layer and you want to hand the
resolved step contract to Consist in one object.
from pathlib import Path
import consist
from consist import BindingResult, ExecutionOptions, Tracker
tracker = Tracker(run_dir="./runs", db_path="./provenance.duckdb")
def prepare() -> dict[str, Path]:
out = consist.output_path("prepared", ext="txt")
out.write_text("ready\n")
return {"prepared": out}
def consume(raw: Path, prepared: Path, maybe_aux: Path | None = None) -> None:
print(raw, prepared, maybe_aux)
with tracker.scenario("orchestrated") as sc:
sc.run(fn=prepare, outputs=["prepared"])
binding = BindingResult(
inputs={"raw": Path("raw.csv")},
input_keys=["prepared"],
optional_input_keys=["maybe_aux"],
)
sc.run(
fn=consume,
binding=binding,
execution_options=ExecutionOptions(input_binding="paths"),
)
When to use:
- Multi-step pipelines (preprocess → simulate → analyze)
- Steps have data dependencies (output of step 1 is input to step 2)
- Multi-year simulations (year 2020 → 2030 → 2040)
- Parameter sweeps where you want to compare across variants
- Shared configuration across multiple steps
Benefits over run():
- Steps cache independently—skip re-executing steps whose inputs haven't changed
- Keep explicit step links with
consist.ref(...)/consist.refs(...)for clear lineage - Use the coupler for scenario-scoped runtime state and trace-oriented flows
- Group runs into scenarios for easy cross-scenario queries
Best practice: For step-to-step coupling, pass explicit artifact links with
consist.ref(previous_result, key="...") (single artifact) or
consist.refs(previous_result, ...) (multiple artifacts). This avoids ambiguous
direct RunResult inputs when an upstream step has multiple outputs.
Understanding the Coupler¶
The coupler is your scenario-scoped artifact registry. When you log an artifact with a key, it's automatically stored in the coupler, making data flow between steps explicit and traceable. This is especially useful when: - You need clean handoffs between tools (developer workflows) - You want a clear list of outputs by name (practitioner workflows) - You want auditable step-to-step lineage (research workflows)
Etymology: A coupler (like the library name "consist" from railroad terminology) is the mechanism that links train cars together. In Consist, the coupler links your workflow steps by storing and threading their outputs.
Key behaviors:
- When you log an artifact with a
key, it's automatically synced to the coupler - You retrieve artifacts with
coupler.require(key)or viainputs=declarations - The coupler persists across all steps in a scenario
- Each scenario has its own coupler; they don't share data
- On cache hits, cached outputs are pre-synced to the coupler before your step runs
You interact with the coupler when:
- Accessing inputs in trace blocks:
coupler.require("population") - Declaring linked inputs to
sc.run():inputs=consist.refs(init_result, "population") - Validating that outputs were produced:
sc.require_outputs(...)
Optional: artifact key registries help keep keys consistent across large workflows.
from consist import ExecutionOptions
from consist.utils import ArtifactKeyRegistry
from pathlib import Path
class Keys(ArtifactKeyRegistry):
RAW = "raw"
PREPROCESSED = "preprocessed"
ANALYSIS = "analysis"
# Use keys in calls
preprocess_result = sc.run(
fn=preprocess,
inputs={Keys.RAW: Path("raw.csv")},
outputs=[Keys.PREPROCESSED],
)
sc.run(
fn=analyze,
inputs=consist.refs(preprocess_result, Keys.PREPROCESSED),
execution_options=ExecutionOptions(input_binding="loaded"),
outputs=[Keys.ANALYSIS],
)
# Validate ad-hoc key lists when needed
Keys.validate([Keys.RAW, Keys.PREPROCESSED])
Live-sync (automatic): When you log an artifact, it's immediately available in the coupler—you don't need to manually call coupler.set().
Optional: Namespace keys with a scoped view: For larger workflows, you can scope reads/writes while keeping fully-qualified keys globally accessible.
beam = sc.coupler.view("beam")
beam.set("plans_in", artifact) # writes key "beam/plans_in"
beam.require("plans_in") # namespace-local access
sc.coupler.require("beam/plans_in") # global access still works
For optional-Consist workflows: If you're using Consist in optional mode (with fallback to Path objects or artifact-like objects), use coupler.set_from_artifact(key, value) instead of coupler.set(). It handles both real Artifacts and artifact-like objects (Paths, strings, noop artifacts) transparently.
Simple Example: Two-Step Workflow¶
from pathlib import Path
import pandas as pd
PREPROCESSED = Path("./preprocessed.parquet")
def preprocess_data(raw_path: Path) -> None:
df = pd.read_csv(raw_path)
df[df["value"] > 0.5].to_parquet(PREPROCESSED)
def analyze_data(preprocessed_path: Path) -> None:
df = pd.read_parquet(preprocessed_path)
summary = df.groupby("category", as_index=False)["value"].mean()
summary.to_parquet("./analysis.parquet")
preprocess_data(Path("raw.csv"))
analyze_data(PREPROCESSED)
The dependency is real, but it is implicit. The second step only works because the first one wrote to a shared path that both functions know about.
from pathlib import Path
import pandas as pd
import consist
from consist import ExecutionOptions, Tracker
tracker = Tracker(run_dir="./runs", db_path="./provenance.duckdb")
def preprocess_data(raw_path: Path) -> dict[str, Path]:
df = pd.read_csv(raw_path)
out_path = Path("./preprocessed.parquet")
df[df["value"] > 0.5].to_parquet(out_path)
return {"preprocessed": out_path}
def analyze_data(preprocessed_path: Path) -> dict[str, Path]:
df = pd.read_parquet(preprocessed_path)
out_path = Path("./analysis.parquet")
df.groupby("category", as_index=False)["value"].mean().to_parquet(out_path)
return {"analysis": out_path}
with tracker.scenario("my_analysis") as sc:
preprocess_result = sc.run(
name="preprocess",
fn=preprocess_data,
inputs={"raw_path": Path("raw.csv")},
outputs=["preprocessed"],
execution_options=ExecutionOptions(input_binding="paths"),
)
sc.run(
name="analyze",
fn=analyze_data,
inputs={
"preprocessed_path": consist.ref(
preprocess_result, key="preprocessed"
)
},
outputs=["analysis"],
execution_options=ExecutionOptions(input_binding="paths"),
)
Each step stays plain Python, but the link between them is now a named artifact reference instead of an implicit shared path.
All steps have scenario_id="my_analysis", making it easy to query together. Change preprocess logic? The preprocess step re-runs; the analyze step re-runs only if the preprocessed artifact changes. Change raw.csv? Both re-execute.
Declaring Inputs: Disk Paths vs Explicit Step Links¶
The inputs= parameter supports two common forms:
Mapping form (declare hashed dependencies):
sc.run(
name="preprocess",
fn=preprocess_data,
inputs={"raw": Path("raw.csv")}, # (1)!
execution_options=ExecutionOptions(input_binding="paths"),
outputs=["preprocessed"],
)
- Hash the file as an input dependency;
input_binding="paths"passes it to the function as a localPathobject automatically. Use this when you're loading data from disk for the first time and want the I/O boundary to stay obvious.
Linked artifact form (recommended for step-to-step coupling):
from consist import ExecutionOptions
sc.run(
name="analyze",
fn=analyze_data,
inputs={"preprocessed": consist.ref(preprocess_result, key="preprocessed")}, # (1)!
execution_options=ExecutionOptions(input_binding="loaded"), # (2)!
outputs=["analysis"],
)
- Explicitly link to one output from a prior
RunResult. - Auto-load inputs as hydrated objects (DataFrames, etc.) before calling the function.
Use this when an upstream step already produced the artifact. With
input_binding="loaded", each mapping key becomes a function parameter with loaded data. This is concise, but the more explicit path-based variant above (input_binding="paths") is usually the better default for production pipelines.
Rule of thumb:
- Use consist.ref(...) for one-off single links.
- Use consist.refs(...) when wiring multiple outputs or when you plan to reuse the mapping.
Concise and explicit refs(...) forms:
# Assume setup_result has outputs: "zones", "persons", and "jobs".
# Concise: map all outputs by their original keys.
all_inputs = consist.refs(setup_result)
# Explicit: select only the outputs you need.
subset_inputs = consist.refs(setup_result, "persons", "jobs")
# Explicit aliases: rename keys for downstream function parameters.
aliased_inputs = consist.refs(
setup_result,
{"population_df": "persons", "employment_df": "jobs"},
)
Alias-map form (when downstream parameter names differ):
sc.run(
name="analyze",
fn=analyze_data,
inputs=consist.refs(preprocess_result, {"cleaned_artifact": "preprocessed"}),
execution_options=ExecutionOptions(input_binding="loaded"),
outputs=["analysis"],
)
Compatibility note: Legacy key-indirection patterns (for example,
inputs=["preprocessed"] and input_keys=) still work. Prefer explicit links
with consist.ref(...)/consist.refs(...) in new code; reserve
BindingResult(...) for complex or externally orchestrated workflows that
already have a binding plan.
Alternative: inline steps with sc.trace
Use this when you want inline code blocks (they always execute, even on cache hits). This can be useful for lightweight validation or logging.with use_tracker(tracker):
with consist.scenario("my_analysis") as sc:
preprocess_result = sc.run(
name="preprocess",
fn=preprocess_data,
inputs={"raw": Path("raw.csv")},
outputs=["preprocessed"],
)
analyze_inputs = consist.refs(preprocess_result, "preprocessed")
with sc.trace(
name="analyze",
inputs=analyze_inputs,
):
df = consist.load_df(analyze_inputs["preprocessed"])
summary = df.groupby("category", as_index=False)["value"].mean()
consist.log_dataframe(summary, key="analysis")
Decorator Defaults, Templates, and Schema Introspection¶
For the full guide to @define_step defaults, callable metadata, name templates,
schema introspection, and cache invalidation helpers, see
Decorators & Metadata.
Passing Data Between Steps with the Coupler¶
with use_tracker(tracker):
with consist.scenario("baseline", model="travel_demand") as sc:
initialize_result = sc.run( # (1)!
name="initialize",
fn=load_population,
outputs=["population"],
)
population_inputs = consist.refs(initialize_result, "population")
for year in [2020, 2030, 2040]: # (2)!
with sc.trace(
name="simulate",
run_id=f"baseline_{year}",
year=year,
inputs=population_inputs, # (3)!
):
df_pop = consist.load_df(population_inputs["population"]) # (4)!
df_result = run_model(year, df_pop)
consist.log_dataframe(df_result, key="persons") # (5)!
- Step 1: load and prepare the population artifact.
- Step 2: simulate for each year.
- Declare the dependency on the population artifact.
- Load the explicitly linked artifact.
- Log output and store it for downstream steps.
What explicit linked inputs do:
- Declares that this step depends on the selected
populationartifact - Consist tracks this as part of the cache key
- If the population artifact hasn't changed, this step's cache is still valid
Simpler alternative (auto-load inputs with sc.run)
If your step can be expressed as a function, sc.run can auto-load inputs into
function parameters so you don't need to call coupler.require(...) manually:
from consist import ExecutionOptions
def simulate_year(population: pd.DataFrame, config: dict) -> pd.DataFrame:
year = config["year"]
return run_model(year, population)
with use_tracker(tracker):
with consist.scenario("baseline", model="travel_demand") as sc:
initialize_result = sc.run(
name="initialize",
fn=load_population,
outputs=["population"],
)
simulation_inputs = consist.refs(initialize_result, "population")
for year in [2020, 2030, 2040]:
sc.run(
name=f"simulate_{year}",
fn=simulate_year,
inputs=simulation_inputs,
outputs=["persons"],
execution_options=ExecutionOptions(input_binding="loaded"),
config={"year": year},
)
Output Validation¶
Consist can validate that your workflow produces expected outputs, catching typos or missing data early. You have three patterns to choose from based on your workflow. Most users just need Pattern A.
Pattern A: Declare required outputs (Static workflows) — START HERE
Use this when you know all your workflow outputs upfront.
with use_tracker(tracker):
with consist.scenario("workflow") as sc:
sc.require_outputs(
"zarr_skims",
"synthetic_population",
)
compile_result = sc.run("compile", fn=asim_compile_runner.run) # (1)!
- Run steps; at scenario exit, missing outputs raise
RuntimeError.
When to use: Static workflows where steps always produce the same outputs (most common).
Benefits: Simple, clear contract. Missing outputs are caught at scenario exit. Typos are caught immediately.
Optional: Add guardrails for typos
sc.require_outputs(
"zarr_skims",
"synthetic_population",
warn_undefined=True, # (1)!
description={
"zarr_skims": "Zone-to-zone travel times in Zarr format",
"synthetic_population": "Synthetic population with activity schedules",
}
)
- Warn if you set other keys by mistake.
Shortcut: Pass required outputs directly to scenario():
with consist.scenario(
"workflow",
require_outputs=["zarr_skims", "synthetic_population"],
) as sc:
...
Pattern B: Runtime-declared validation (Dynamic/optional outputs)
Use this when outputs are dynamic or optional (e.g., optional debug outputs, or conditional branching).
with use_tracker(tracker):
with consist.scenario("workflow") as sc:
sc.declare_outputs(
"zarr_skims", "synthetic_population",
required={"zarr_skims": True, "synthetic_population": True}
)
if debugging: # (1)!
sc.declare_outputs("debug_report", required={"debug_report": False}) # (2)!
compile_result = sc.run("compile", fn=asim_compile_runner.run) # (3)!
- Add optional outputs later if needed.
- Declare an optional debug output.
- Missing required outputs raise
RuntimeErrorat exit.
When to use: Workflows with per-key control over required vs optional, or conditional outputs.
Benefits: Granular per-key control. Mix required and optional outputs. Add outputs dynamically.
Selective Output Collection with collect_by_keys()¶
By default, when a step produces multiple outputs, all are automatically synced to the coupler. Use collect_by_keys() when you need to:
- Select only specific outputs from many (ignore others)
- Namespace outputs by year or scenario (prefix them)
result = sc.run("step", fn=some_func) # (1)!
sc.collect_by_keys(result.outputs, "persons", "households") # (2)!
for year in [2020, 2030, 2040]:
result = sc.run(f"forecast_{year}", fn=forecast_fn)
sc.collect_by_keys(result.outputs, "population", "skims", prefix=f"{year}_") # (3)!
- Produces
persons,households,jobs. - Keep only selected outputs; others are ignored.
- Namespace outputs by year (e.g.,
2020_population).
Bulk logging with metadata:
with consist.scenario("outputs") as sc:
with sc.trace(name="export_outputs"):
outputs = consist.log_artifacts( # (1)!
{
"persons": "results/persons.parquet",
"households": "results/households.parquet",
"jobs": "results/jobs.parquet"
},
metadata_by_key={
"households": {"role": "primary_unit"},
"jobs": {"role": "employment_proxy"}
},
year=2030,
scenario_name="base" # (2)!
)
- Log multiple files at once with explicit keys.
- All outputs get
year=2030,scenario_name="base";householdsalso getsrole="primary_unit".
Example: Parameter Sweep in a Scenario¶
from pathlib import Path
import pandas as pd
with use_tracker(tracker):
with consist.scenario("sensitivity_analysis") as sc:
def load_data() -> pd.DataFrame:
return pd.read_csv("data.csv")
setup_result = sc.run( # (1)!
name="setup",
fn=load_data,
inputs=[Path("data.csv")],
outputs=["data"],
)
for threshold in [0.3, 0.5, 0.7]: # (2)!
data_inputs = consist.refs(setup_result, "data")
with sc.trace(
name="analyze",
run_id=f"threshold_{threshold}",
threshold=threshold,
inputs=data_inputs,
):
df = consist.load_df(data_inputs["data"])
filtered = df[df["value"] > threshold]
consist.log_dataframe(filtered, key="filtered")
- Load once, reuse for all variants.
- Test different thresholds as separate runs.
Each threshold creates a separate run with its own cache entry. Re-run later? Consist returns cached results for matching thresholds, skips setup (since input unchanged).
Pattern 3: Container Integration¶
Use containers when you have existing tools, models, or legacy code that runs as a subprocess or Docker container. The image digest becomes part of the cache key. For the full reference, see the Container Integration Guide.
When to use:
- Running ActivitySim, SUMO, BEAM, or other external models
- Legacy code you don't want to refactor
- Python/R/Java executables that you invoke as subprocesses
- Tools that expect specific file paths or output directories
from consist.integrations.containers import run_container
from pathlib import Path
host_inputs = Path("data/inputs").resolve()
host_outputs = Path("data/outputs").resolve()
result = run_container(
tracker=tracker,
run_id="model_2030",
image="travel-model:v2.1", # (1)!
command=["python", "run.py", "--year", "2030"],
volumes={
str(host_inputs): "/inputs",
str(host_outputs): "/outputs",
},
inputs=[host_inputs / "input.csv"],
outputs={"results": host_outputs / "results.parquet"},
)
output_artifact = result.artifacts["results"]
- The image digest becomes part of the cache key.
Change the image version? Cache invalidates. Same image + inputs? Returns cached results.
Alternative: use consist.run with ExecutionOptions(executor="container")
If you prefer the same API as `consist.run`, you can use the container executor:import consist
from consist import ExecutionOptions
with consist.use_tracker(tracker):
result = consist.run(
name="model_2030",
inputs=[host_inputs / "input.csv"],
output_paths={"results": host_outputs / "results.parquet"},
execution_options=ExecutionOptions(
executor="container",
container={
"image": "travel-model:v2.1",
"command": ["python", "run.py", "--year", "2030"],
"volumes": {str(host_inputs): "/inputs", str(host_outputs): "/outputs"},
},
),
)
Advanced Patterns¶
Cache Hits and the Coupler¶
When Consist detects a cache hit (same step, same code version, same config and inputs):
- In
sc.run(): Your function is skipped entirely. Cached outputs are returned and automatically synced to the coupler. - In
sc.trace(): Cached outputs are pre-synced to the coupler BEFORE your trace body runs, so you can access them withcoupler.require()immediately. Your code still executes (unlikesc.run()).
This means your code doesn't need to handle cache hits differently—the coupler is populated automatically in both cases.
Cache Hydration¶
By default, Consist returns metadata-only cache hits (no file copies). You can opt in to materializing cached files when needed:
outputs-requested: Copy only specific cached outputs to paths you provideoutputs-all: Copy all cached outputs into a target directory while preserving their historical relative layoutinputs-missing: When a cache miss occurs, backfill missing inputs from prior runs before executing
Note: outputs-requested requires output_paths=... so Consist knows where to write the files.
inputs-missing only works for inputs that are tracked artifacts (not raw paths), so Consist can find
the prior run's files or reconstruct ingested tables.
When cache-hit output files are gone, outputs-all and the new run-scoped
output recovery API can reconstruct ingested CSV/Parquet outputs from DuckDB.
For archive-mirror recovery or debug/export workflows, prefer the explicit
tracker.materialize_run_outputs(...) API instead of overloading cache
hydration.
For archive-mirror recovery, cache hydration supports
materialize_cached_outputs_source_root=Path(...) for outputs-requested and
outputs-all. You can pass it through
cache_options=CacheOptions(materialize_cached_outputs_source_root=...) on
consist.run(...), Tracker.run(...), and scenario steps, or via the low-level
tracker.start_run(...) / tracker.begin_run(...) APIs directly.
When you use the explicit tracker.materialize_run_outputs(...) API, target_root
may be either the tracker run_dir or any configured tracker mount root without
needing allow_external_paths=True.
Set per-run via cache_options=CacheOptions(cache_hydration=...) (for run(...))
or for scenario defaults via step_cache_hydration=...:
with use_tracker(tracker):
with consist.scenario("baseline", step_cache_hydration="inputs-missing") as sc:
prepare_result = sc.run(
name="prepare_population",
fn=load_population,
outputs=["population"],
)
population_inputs = consist.refs(prepare_result, "population")
with sc.trace(
name="simulate",
inputs=population_inputs,
): # (1)!
df_pop = consist.load_df(population_inputs["population"])
...
- This step backfills missing inputs from prior runs before executing.
Mixing Runs and Scenarios¶
Call consist.run(...) inside a scenario when a step should cache independently:
from consist import ExecutionOptions
with use_tracker(tracker):
with consist.scenario("baseline") as sc:
preprocess = consist.run(
fn=expensive_preprocessing,
inputs={"network_file": Path("network.geojson")},
outputs=["processed"],
) # (1)!
simulate_inputs = consist.refs(preprocess, "processed")
with sc.trace(
name="simulate",
inputs=simulate_inputs,
): # (2)!
network = consist.load_df(simulate_inputs["processed"])
...
- Run expensive preprocessing independently with its own cache key.
- Later steps consume the preprocessed output via an explicit ref.
When Does Code Execute? Understanding sc.run() vs sc.trace()¶
When building multi-step scientific workflows, a critical question arises: Does my Python code run every time, or only when inputs change? This section clarifies the difference between sc.run() and sc.trace(), two fundamental Consist patterns that differ in execution behavior on cache hits.
The Core Distinction¶
On a cache hit (when Consist finds previously-cached results for this step with the same inputs):
-
sc.trace(...)— Your Python block always executes. Consist returns cached outputs, but your code still runs. Use this for logging, diagnostics, or steps that must track intermediate state every run. -
sc.run(...)— Your Python function only executes on cache miss. On a cache hit, Consist skips calling your function entirely and returns the cached output. Use this for expensive operations like scientific simulations, data processing, or model fitting.
Why This Matters: Performance & Side Effects¶
Consider an expensive simulation that takes 2 hours. Running it 100 times with the same inputs would normally take 200 hours of compute. With Consist:
- If you use
sc.trace(): code runs 100 times (200 hours) — caching provides metadata only - If you use
sc.run(): code runs once (2 hours), then 99 cache hits retrieve results instantly
Side effects also differ. If your step writes temporary files, updates external systems, or has other side effects, sc.trace() repeats them on every run, while sc.run() skips them on cache hits.
Example: ActivitySim-Style Land Use Simulation¶
Here's a realistic example showing the difference:
Using sc.trace() (Always Runs)¶
with use_tracker(tracker):
with consist.scenario("baseline") as sc:
year = 2030
with sc.trace( # (1)!
name="prepare_land_use",
inputs={"geojson": Path("land_use.geojson")},
year=year
):
print(f"Processing land use for year {year}") # (2)!
zones = load_zones("land_use.geojson")
df_zones = pd.DataFrame(zones) # (3)!
consist.log_dataframe(df_zones, key="zones")
- This block executes every time, even on cache hits.
- Code always runs—useful for logging or status.
- Consist still returns cached outputs if they exist.
Using sc.run() (Skips on Cache Hit)¶
with use_tracker(tracker):
with consist.scenario("baseline") as sc:
def prepare_land_use(geojson_path: Path) -> pd.DataFrame:
print(f"Processing land use") # (1)!
zones = load_zones(geojson_path)
return pd.DataFrame(zones)
result = sc.run( # (3)!
name="prepare_land_use",
fn=prepare_land_use,
inputs={"geojson_path": Path("land_use.geojson")},
outputs=["zones"],
execution_options=ExecutionOptions(input_binding="paths"), # (2)!
)
- This function only runs on cache miss; it prints on the first run.
- Bind the named input as a local
Pathobject into the function argument. - Outputs are synced to the coupler automatically.
Which Should You Use?¶
Choose based on your workflow needs:
| Scenario | Use | Why |
|---|---|---|
| Expensive simulation, model fitting, or large data transformation | sc.run() |
Skip re-execution on cache hits; critical for 2+ hour runtimes or iterative analysis |
| Steps that log, print diagnostics, or validate state on every run | sc.trace() |
Need to see side effects repeated; cheaper operations that re-run quickly |
| Multi-year simulation where early years are cached, new years execute | sc.run() |
Each year has independent cache entry; skip re-running 2020 when computing 2030 |
| Mixed: some expensive, some diagnostic | Both in same scenario | Use sc.run() for expensive steps, sc.trace() for cheap validation |
Practical Guidance¶
For most scientific workflows, prefer sc.run() when:
- Your function is deterministic (no randomness unless seeded in config)
- It doesn't have important side effects outside the outputs you log
- You can structure it as a pure function (inputs → outputs)
Use sc.trace() when:
- You need to run initialization or setup code that triggers external systems
- You want explicit control over what happens every run vs. only on cache miss
- Your step is fast enough that re-execution overhead doesn't matter
On large file inputs: If your function receives multi-GB files, use
execution_options=ExecutionOptions(input_binding="paths")
and cache_options=CacheOptions(cache_hydration="inputs-missing") to ensure
input files are available on cache misses without loading them into memory.
Alternative: log file outputs inside the step
If your function writes files, log them with the injected context (and read inputs yourself if needed):from consist import ExecutionOptions
from pathlib import Path
def beam_preprocess(data_file, _consist_ctx) -> None:
out_path = _consist_ctx.output_path("beam_inputs")
...
_consist_ctx.log_output(out_path, key="beam_inputs")
with use_tracker(tracker):
with consist.scenario("baseline") as sc:
sc.run(
name="beam_preprocess",
fn=beam_preprocess,
inputs={"data_file": Path("data.parquet")},
execution_options=ExecutionOptions(
input_binding="paths",
inject_context=True,
),
)
Query Facets with pivot_facets¶
Log small, queryable config values (facets) and pivot them into a wide table for analysis. This is a simple way to compare many runs side‑by‑side (for example, a sensitivity analysis across parameters):
from sqlmodel import select
import consist
params = consist.pivot_facets(
namespace="simulate",
keys=["alpha", "beta", "mode"],
value_columns={"mode": "value_str"},
) # (1)!
rows = consist.run_query(
select(params.c.run_id, params.c.alpha, params.c.beta, params.c.mode),
tracker=tracker,
)
- Pivot config facets into columns for analysis.
See Concepts for when to use config vs facet.
Motivation: When Caching Saves Time¶
Caching is most valuable in workflows with many runs and expensive computation. Here are realistic scenarios from scientific domains:
Example 1: Land-Use Model Sensitivity Analysis
A sensitivity sweep tests 40 parameter combinations (toll levels, parking costs, transit subsidies). - Each ActivitySim run: 30 minutes - Without caching: 40 runs × 30 min = 20 hours - With caching: Base population synthesis (30 min, once) + 39 parameter tweaks with cache hits (5 min each) = 3.75 hours - Time saved: 81% reduction in modeling time
Example 2: ActivitySim Calibration Iteration
Mode choice coefficients need iterative calibration against observed transit ridership. - Without caching: Repeat all 3 steps = 75 minutes per iteration × 5 iterations = 375 minutes total - With caching: Step 1–2 are cache hits, only step 3 re-executes = 115 minutes - Time saved: 69% reduction; frees analyst time for interpretation
Example 3: Grid Dispatch Multi-Scenario Ensemble
A baseline scenario and 8 future scenarios all share the same network preprocessing pipeline. - Preprocessing: 3 hours; Each scenario dispatch: 20 minutes - Without caching: 9 × (3 hours + 20 min) = 29.85 hours - With caching: Preprocessing once (3 hours), then 8 scenario runs hit cache on preprocessing = 5.67 hours - Time saved: 81% reduction; enables rapid scenario exploration
Querying Results¶
Finding Runs¶
See: Example notebooks.
import consist
run = consist.find_run(
tracker=tracker,
parent_id="baseline", # (1)!
year=2030,
model="simulate"
)
runs_by_year = consist.find_runs(
tracker=tracker,
parent_id="baseline",
model="simulate",
stage="supply_demand_loop",
phase="traffic_assignment",
index_by="year"
)
result_2030 = runs_by_year[2030]
- Filter by scenario ID.
Loading Artifacts¶
See: Example notebooks.
artifacts = tracker.get_artifacts_for_run(run.id)
persons_artifact = artifacts.outputs["persons"]
df = consist.load_df(persons_artifact) # (1)!
- Load the artifact data into a DataFrame.
Cross-Run Queries with Views¶
See: Example notebooks.
Register schemas to enable SQL queries across all runs:
import consist
from sqlmodel import SQLModel, Field, select, func
from consist import Tracker
class Person(SQLModel, table=True):
person_id: int = Field(primary_key=True)
age: int
number_of_trips: int
tracker = Tracker(
run_dir="./runs",
db_path="./provenance.duckdb",
schemas=[Person]
)
VPerson = tracker.views.Person # (1)!
query = (
select(
VPerson.consist_scenario_id,
VPerson.consist_year,
func.avg(VPerson.number_of_trips).label("avg_trips")
)
.where(VPerson.consist_scenario_id.in_(["baseline", "high_gas"]))
.group_by(VPerson.consist_scenario_id, VPerson.consist_year)
)
results = consist.run_query(query, tracker=tracker)
- Views are available after running scenarios and registering schemas.
Views automatically include consist_scenario_id, consist_year, and other metadata columns for filtering and grouping.
For more on ingestion and hybrid views, see Data Materialization Strategy.
Generating Schemas from Captured Data¶
If you ingest tabular data into DuckDB, Consist can capture the observed schema and export an editable SQLModel stub so you can curate PK/FK constraints and then register the model for views. This is useful when you want a stable, documented schema for downstream analysis or audits.
You can also opt into lightweight file schema capture when logging CSV/Parquet artifacts by passing profile_file_schema=True (and optionally file_schema_sample_rows=) to log_artifact. These captured schemas are stored in the provenance DB and remain available even if the original files move or are deleted.
If you already have a content hash (e.g., after copying or moving a file), pass content_hash= to log_artifact to reuse it without re-hashing the file. For safety, Consist will not overwrite an existing, different hash unless you pass force_hash_override=True. To verify the hash against disk, use validate_content_hash=True.
See Schema Export for the full workflow (CLI + Python) and column-name/__tablename__ guidelines.
See Data Materialization Strategy for ingestion tradeoffs and DB fallback behavior.