Skip to content

Usage Guide

Consist provides flexible patterns for tracking provenance in scientific workflows. This guide walks you through the main usage patterns, from simple single-step runs to complex multi-year simulations. Each section is written to help:

  • Developers integrating Consist into a simulation tool
  • Practitioners running tools and wanting clearer inputs/outputs
  • Researchers managing multi-stage pipelines and reproducibility

New to Consist? Start with the quickstart tutorial, then work through the examples below in order.

If you are building a reusable domain integration, see Building a Domain Tracker for the recommended wrapper architecture.


Contents


Choosing Your Pattern

Choose based on what you're building and how much structure you need:

Your Workflow Pattern Why
Single data processing step (clean, transform, aggregate) run Simple: caches the entire function call with low overhead. Use for self-contained operations.
Multi-step workflow (preprocessing → simulation → analysis) scenario Groups related steps, shares state via coupler, per-step caching. Use when steps have dependencies or shared configuration.
Existing tool/model (subprocess, legacy code, container) container or depends_on Wraps external executables, tracks container digest as cache key. Use for black-box tools.
Parameter sweep / sensitivity analysis scenario + loop Run the same step with different configs, compare results.
Multi-year simulation scenario + loop Runs in years, each year caches independently, all years share scenario context.

Pattern 1: Single-Step Runs (run())

Use run() when you have a self-contained operation: data cleaning, transformation, aggregation, or any callable that takes inputs and produces outputs. Consist caches the entire function call based on code version + config + input data.

Recommended default: use ExecutionOptions(input_binding="paths"). That keeps the function boundary honest: inputs={...} declares identity and lineage, and the callable receives those named inputs as local Path objects. Use input_binding="loaded" when you want Consist to hydrate tables or objects before calling the function. Use input_binding="none" when inputs are identity-only and you do not want automatic binding.

Choosing an Input Binding Mode

Mode Function receives Best for Main tradeoff
"paths" local Path objects explicit filesystem workflows, subprocesses, existing code that already reads files requires a usable local path
"loaded" loaded DataFrame / Python objects tabular analysis, ingested artifacts, DB-backed hydration hides the I/O boundary
"none" no automatic binding advanced wrappers, identity-only dependencies you must supply args some other way

runtime_kwargs still exists, but it should usually mean "runtime-only values that are not the declared named inputs." It is no longer the recommended way to pass paths into a path-based step.

When to use:

  • Simple data transformations (filter, aggregate, merge)
  • Expensive computations that don't depend on other runs
  • One-off analyses
  • You want the simplest mental model

When NOT to use:

  • Multi-step workflows with dependencies between steps (use scenario() instead)
  • Existing tools that write to directories (use depends_on or container)

Simple Example: Data Cleaning

from pathlib import Path
import pandas as pd

RAW = Path("raw.parquet")
CLEANED = Path("cleaned.parquet")


def clean_data(raw_path: Path, threshold: float = 0.5) -> None:
    df = pd.read_parquet(raw_path)
    df[df["value"] > threshold].to_parquet(CLEANED)


clean_data(RAW, threshold=0.5)
cleaned_df = pd.read_parquet(CLEANED)
print(cleaned_df.head())

This is plain Python and it works, but there is no cache identity, no run record, and no answer to "which exact code/config/input produced this file?"

from pathlib import Path
import pandas as pd
from consist import ExecutionOptions, Tracker

tracker = Tracker(run_dir="./runs", db_path="./provenance.duckdb")


def clean_data(raw_path: Path, threshold: float = 0.5) -> dict[str, Path]:
    df = pd.read_parquet(raw_path)
    out_path = Path("./cleaned.parquet")
    df[df["value"] > threshold].to_parquet(out_path)
    return {"cleaned": out_path}


result = tracker.run(
    fn=clean_data,
    inputs={"raw_path": Path("raw.parquet")},
    config={"threshold": 0.5},
    outputs=["cleaned"],
    execution_options=ExecutionOptions(input_binding="paths"),
)

cleaned_artifact = result.outputs["cleaned"]
cleaned_df = pd.read_parquet(cleaned_artifact.path)
print(cleaned_artifact.path)

The function stays ordinary and testable. Consist adds the cache key, lineage, and queryable artifact record around the call, while input_binding="paths" keeps the file boundary explicit.

from pathlib import Path
import pandas as pd
from consist import ExecutionOptions, Tracker

tracker = Tracker(run_dir="./runs", db_path="./provenance.duckdb")


def clean_data(raw: pd.DataFrame, threshold: float = 0.5) -> dict[str, Path]:
    out_path = Path("./cleaned.parquet")
    raw[raw["value"] > threshold].to_parquet(out_path)
    return {"cleaned": out_path}


result = tracker.run(
    fn=clean_data,
    inputs={"raw": Path("raw.parquet")},
    config={"threshold": 0.5},
    outputs=["cleaned"],
    execution_options=ExecutionOptions(input_binding="loaded"),
)

cleaned_artifact = result.outputs["cleaned"]
cleaned_df = pd.read_parquet(cleaned_artifact.path)
print(cleaned_artifact.path)

This is convenient for tabular work: Consist loads the named input before the function executes. Prefer it when you want hydrated objects rather than explicit file boundaries.

If you run it again with the same inputs and config, you should get a cache hit and no re-execution.

Preferred run configuration style (options objects):

from consist import CacheOptions, ExecutionOptions, OutputPolicyOptions

result = consist.run(
    fn=clean_data,
    inputs={"raw": Path("raw.csv")},
    config={"threshold": 0.5},
    outputs=["cleaned"],
    cache_options=CacheOptions(cache_mode="reuse", cache_hydration="inputs-missing"),
    output_policy=OutputPolicyOptions(output_missing="error"),
    execution_options=ExecutionOptions(input_binding="loaded"),
)

To scope code hashing to the callable instead of full-repo Git state:

from consist import CacheOptions

result = consist.run(
    fn=clean_data,
    inputs={"raw": Path("raw.csv")},
    outputs=["cleaned"],
    cache_options=CacheOptions(
        code_identity="callable_module",
        code_identity_extra_deps=["shared/helpers.py"],
    ),
)

This reduces cache misses when unrelated files elsewhere in the repository change.

Legacy run-policy kwargs are no longer supported on run(...) APIs. Use cache_options=..., output_policy=..., and execution_options=....

For file outputs, prefer returning dict[str, Path] and declaring outputs=[...] when you control the function body. Use managed helpers like consist.output_path(...), _consist_ctx.output_path(...), or _consist_ctx.output_dir(...) when you need Consist to allocate paths for you or when wrapping tools that do not return their outputs directly.

Alternative: use injected context and managed output paths Use this when Consist should allocate the output location for you, or when you are wrapping a file-writing tool that returns `None`.
from consist import ExecutionOptions
from pathlib import Path
import pandas as pd

def clean_data(raw_file: Path, _consist_ctx) -> None:
    df = pd.read_csv(raw_file)
    out_path = _consist_ctx.output_path("cleaned")
    df.to_parquet(out_path)
    _consist_ctx.log_output(out_path, key="cleaned")

with use_tracker(tracker):
    result = consist.run(
        fn=clean_data,
        inputs={"raw_file": Path("raw.csv")},  # (1)!
        execution_options=ExecutionOptions(
            input_binding="paths",
            inject_context=True,
        ),
    )
1. Hash the input file as part of the cache key.

Example with Config

Use Pydantic models for structured configs:

import consist
from consist import Tracker, use_tracker
from pydantic import BaseModel
from pathlib import Path
import pandas as pd

class CleaningConfig(BaseModel):
    threshold: float = 0.5
    remove_outliers: bool = True

tracker = Tracker(run_dir="./runs", db_path="./provenance.duckdb")

def clean_data(raw: pd.DataFrame, config: CleaningConfig) -> pd.DataFrame:
    df = raw
    df = df[df["value"] >= config.threshold]
    if config.remove_outliers:
        df = df[df["value"] < df["value"].quantile(0.95)]
    return df

with use_tracker(tracker):
    result = consist.run(
        fn=clean_data,
        inputs={"raw": Path("raw.csv")},
        config=CleaningConfig(threshold=0.5, remove_outliers=True),
        outputs=["cleaned"],
    )

Each distinct config → separate cache entries. Change the threshold? Only that run re-executes.

Wrapping Legacy or Black-Box Tools

If you have existing code that writes files to a directory, use the injected run context to capture outputs. _consist_ctx.output_dir(...) avoids manual mkdir/path joining and keeps files in the managed artifact location.

from pathlib import Path
from consist import ExecutionOptions

def run_legacy_model(upstream, _consist_ctx) -> None:
    import legacy_model

    output_dir = _consist_ctx.output_dir("legacy_outputs")
    with _consist_ctx.capture_outputs(output_dir, pattern="*.csv"):
        legacy_model.run(upstream, output_dir=output_dir)

with use_tracker(tracker):
    result = consist.run(
        fn=run_legacy_model,
        inputs={"upstream": Path("input.csv")},
        depends_on=[Path("config.yaml"), Path("parameters.json")],  # (1)!
        execution_options=ExecutionOptions(
            input_binding="paths",
            inject_context=True,
        ),
    )
  1. Hash these files too so config changes invalidate the cache.
  2. input_binding="paths" passes raw paths to the legacy tool while still keeping those inputs in cache identity and lineage.

Captured outputs are keyed by filename stem (for example, results.csv -> results).

Alternative: capture a fixed output directory If the tool always writes to a known folder and returns `None`, you can capture it directly:
from consist import ExecutionOptions

with use_tracker(tracker):
    def run_legacy_model(upstream) -> None:
        import legacy_model

        legacy_model.run(upstream, output_dir="outputs")

    result = consist.run(
        fn=run_legacy_model,
        inputs={"upstream": Path("input.csv")},  # (1)!
        depends_on=[Path("config.yaml")],
        execution_options=ExecutionOptions(
            input_binding="paths",
        ),
        capture_dir=Path("outputs"),
        capture_pattern="*.csv",
    )
1. Hash the input file as part of the cache key.
Alternative: auto-load tabular inputs into function arguments Auto-loading is convenient for short scripts and notebook-scale analysis:
def clean_data(raw: pd.DataFrame, threshold: float = 0.5) -> pd.DataFrame:
    return raw[raw["value"] > threshold]
When `inputs` is a mapping and `input_binding="loaded"`, Consist hydrates those artifacts into function arguments for you. This is shorter, but it also hides the file boundary. Prefer the explicit `Path`-based pattern above when you want the workflow wiring to stay obvious.

The depends_on files are hashed as part of the cache key, so changing config.yaml invalidates the cache.


Pattern 2: Multi-Step Workflows (scenario())

Use scenario() when you have multiple interdependent steps that share state or configuration. Scenarios group steps into a coherent unit (a "run scenario"), while each step caches independently.

Recommended default: keep step links explicit. Have each step return named artifact paths, then pass those outputs downstream with consist.ref(...) or consist.refs(...). Reach for the coupler when you need scenario-scoped runtime state, optional branching, or trace-style orchestration.

Compiled bindings for orchestrators

When a planner or external orchestrator has already resolved a step's binding decision, pass that result to sc.run(...) as BindingResult(...). Treat the binding object as an execution envelope, not a planning API: the planner lives outside Consist, and Consist executes the already-resolved binding.

Keep the explicit consist.ref(...) / consist.refs(...) path as the recommended default for direct step-to-step workflow code. Use BindingResult when the binding plan is coming from a separate layer and you want to hand the resolved step contract to Consist in one object.

from pathlib import Path
import consist
from consist import BindingResult, ExecutionOptions, Tracker

tracker = Tracker(run_dir="./runs", db_path="./provenance.duckdb")


def prepare() -> dict[str, Path]:
    out = consist.output_path("prepared", ext="txt")
    out.write_text("ready\n")
    return {"prepared": out}


def consume(raw: Path, prepared: Path, maybe_aux: Path | None = None) -> None:
    print(raw, prepared, maybe_aux)


with tracker.scenario("orchestrated") as sc:
    sc.run(fn=prepare, outputs=["prepared"])
    binding = BindingResult(
        inputs={"raw": Path("raw.csv")},
        input_keys=["prepared"],
        optional_input_keys=["maybe_aux"],
    )

    sc.run(
        fn=consume,
        binding=binding,
        execution_options=ExecutionOptions(input_binding="paths"),
    )

When to use:

  • Multi-step pipelines (preprocess → simulate → analyze)
  • Steps have data dependencies (output of step 1 is input to step 2)
  • Multi-year simulations (year 2020 → 2030 → 2040)
  • Parameter sweeps where you want to compare across variants
  • Shared configuration across multiple steps

Benefits over run():

  • Steps cache independently—skip re-executing steps whose inputs haven't changed
  • Keep explicit step links with consist.ref(...) / consist.refs(...) for clear lineage
  • Use the coupler for scenario-scoped runtime state and trace-oriented flows
  • Group runs into scenarios for easy cross-scenario queries

Best practice: For step-to-step coupling, pass explicit artifact links with consist.ref(previous_result, key="...") (single artifact) or consist.refs(previous_result, ...) (multiple artifacts). This avoids ambiguous direct RunResult inputs when an upstream step has multiple outputs.

Understanding the Coupler

The coupler is your scenario-scoped artifact registry. When you log an artifact with a key, it's automatically stored in the coupler, making data flow between steps explicit and traceable. This is especially useful when: - You need clean handoffs between tools (developer workflows) - You want a clear list of outputs by name (practitioner workflows) - You want auditable step-to-step lineage (research workflows)

Etymology: A coupler (like the library name "consist" from railroad terminology) is the mechanism that links train cars together. In Consist, the coupler links your workflow steps by storing and threading their outputs.

Key behaviors:

  • When you log an artifact with a key, it's automatically synced to the coupler
  • You retrieve artifacts with coupler.require(key) or via inputs= declarations
  • The coupler persists across all steps in a scenario
  • Each scenario has its own coupler; they don't share data
  • On cache hits, cached outputs are pre-synced to the coupler before your step runs

You interact with the coupler when:

  • Accessing inputs in trace blocks: coupler.require("population")
  • Declaring linked inputs to sc.run(): inputs=consist.refs(init_result, "population")
  • Validating that outputs were produced: sc.require_outputs(...)

Optional: artifact key registries help keep keys consistent across large workflows.

from consist import ExecutionOptions
from consist.utils import ArtifactKeyRegistry
from pathlib import Path

class Keys(ArtifactKeyRegistry):
    RAW = "raw"
    PREPROCESSED = "preprocessed"
    ANALYSIS = "analysis"

# Use keys in calls
preprocess_result = sc.run(
    fn=preprocess,
    inputs={Keys.RAW: Path("raw.csv")},
    outputs=[Keys.PREPROCESSED],
)
sc.run(
    fn=analyze,
    inputs=consist.refs(preprocess_result, Keys.PREPROCESSED),
    execution_options=ExecutionOptions(input_binding="loaded"),
    outputs=[Keys.ANALYSIS],
)

# Validate ad-hoc key lists when needed
Keys.validate([Keys.RAW, Keys.PREPROCESSED])

Live-sync (automatic): When you log an artifact, it's immediately available in the coupler—you don't need to manually call coupler.set().

Optional: Namespace keys with a scoped view: For larger workflows, you can scope reads/writes while keeping fully-qualified keys globally accessible.

beam = sc.coupler.view("beam")
beam.set("plans_in", artifact)            # writes key "beam/plans_in"
beam.require("plans_in")                  # namespace-local access
sc.coupler.require("beam/plans_in")       # global access still works

For optional-Consist workflows: If you're using Consist in optional mode (with fallback to Path objects or artifact-like objects), use coupler.set_from_artifact(key, value) instead of coupler.set(). It handles both real Artifacts and artifact-like objects (Paths, strings, noop artifacts) transparently.

Simple Example: Two-Step Workflow

from pathlib import Path
import pandas as pd

PREPROCESSED = Path("./preprocessed.parquet")


def preprocess_data(raw_path: Path) -> None:
    df = pd.read_csv(raw_path)
    df[df["value"] > 0.5].to_parquet(PREPROCESSED)


def analyze_data(preprocessed_path: Path) -> None:
    df = pd.read_parquet(preprocessed_path)
    summary = df.groupby("category", as_index=False)["value"].mean()
    summary.to_parquet("./analysis.parquet")


preprocess_data(Path("raw.csv"))
analyze_data(PREPROCESSED)

The dependency is real, but it is implicit. The second step only works because the first one wrote to a shared path that both functions know about.

from pathlib import Path
import pandas as pd
import consist
from consist import ExecutionOptions, Tracker

tracker = Tracker(run_dir="./runs", db_path="./provenance.duckdb")


def preprocess_data(raw_path: Path) -> dict[str, Path]:
    df = pd.read_csv(raw_path)
    out_path = Path("./preprocessed.parquet")
    df[df["value"] > 0.5].to_parquet(out_path)
    return {"preprocessed": out_path}


def analyze_data(preprocessed_path: Path) -> dict[str, Path]:
    df = pd.read_parquet(preprocessed_path)
    out_path = Path("./analysis.parquet")
    df.groupby("category", as_index=False)["value"].mean().to_parquet(out_path)
    return {"analysis": out_path}


with tracker.scenario("my_analysis") as sc:
    preprocess_result = sc.run(
        name="preprocess",
        fn=preprocess_data,
        inputs={"raw_path": Path("raw.csv")},
        outputs=["preprocessed"],
        execution_options=ExecutionOptions(input_binding="paths"),
    )

    sc.run(
        name="analyze",
        fn=analyze_data,
        inputs={
            "preprocessed_path": consist.ref(
                preprocess_result, key="preprocessed"
            )
        },
        outputs=["analysis"],
        execution_options=ExecutionOptions(input_binding="paths"),
    )

Each step stays plain Python, but the link between them is now a named artifact reference instead of an implicit shared path.

All steps have scenario_id="my_analysis", making it easy to query together. Change preprocess logic? The preprocess step re-runs; the analyze step re-runs only if the preprocessed artifact changes. Change raw.csv? Both re-execute.

The inputs= parameter supports two common forms:

Mapping form (declare hashed dependencies):

sc.run(
    name="preprocess",
    fn=preprocess_data,
    inputs={"raw": Path("raw.csv")},  # (1)!
    execution_options=ExecutionOptions(input_binding="paths"),
    outputs=["preprocessed"],
)

  1. Hash the file as an input dependency; input_binding="paths" passes it to the function as a local Path object automatically. Use this when you're loading data from disk for the first time and want the I/O boundary to stay obvious.

Linked artifact form (recommended for step-to-step coupling):

from consist import ExecutionOptions

sc.run(
    name="analyze",
    fn=analyze_data,
    inputs={"preprocessed": consist.ref(preprocess_result, key="preprocessed")},  # (1)!
    execution_options=ExecutionOptions(input_binding="loaded"),  # (2)!
    outputs=["analysis"],
)

  1. Explicitly link to one output from a prior RunResult.
  2. Auto-load inputs as hydrated objects (DataFrames, etc.) before calling the function. Use this when an upstream step already produced the artifact. With input_binding="loaded", each mapping key becomes a function parameter with loaded data. This is concise, but the more explicit path-based variant above (input_binding="paths") is usually the better default for production pipelines.

Rule of thumb: - Use consist.ref(...) for one-off single links. - Use consist.refs(...) when wiring multiple outputs or when you plan to reuse the mapping.

Concise and explicit refs(...) forms:

# Assume setup_result has outputs: "zones", "persons", and "jobs".
# Concise: map all outputs by their original keys.
all_inputs = consist.refs(setup_result)

# Explicit: select only the outputs you need.
subset_inputs = consist.refs(setup_result, "persons", "jobs")

# Explicit aliases: rename keys for downstream function parameters.
aliased_inputs = consist.refs(
    setup_result,
    {"population_df": "persons", "employment_df": "jobs"},
)

Alias-map form (when downstream parameter names differ):

sc.run(
    name="analyze",
    fn=analyze_data,
    inputs=consist.refs(preprocess_result, {"cleaned_artifact": "preprocessed"}),
    execution_options=ExecutionOptions(input_binding="loaded"),
    outputs=["analysis"],
)

Compatibility note: Legacy key-indirection patterns (for example, inputs=["preprocessed"] and input_keys=) still work. Prefer explicit links with consist.ref(...)/consist.refs(...) in new code; reserve BindingResult(...) for complex or externally orchestrated workflows that already have a binding plan.

Alternative: inline steps with sc.trace Use this when you want inline code blocks (they always execute, even on cache hits). This can be useful for lightweight validation or logging.
with use_tracker(tracker):
    with consist.scenario("my_analysis") as sc:
        preprocess_result = sc.run(
            name="preprocess",
            fn=preprocess_data,
            inputs={"raw": Path("raw.csv")},
            outputs=["preprocessed"],
        )
        analyze_inputs = consist.refs(preprocess_result, "preprocessed")

        with sc.trace(
            name="analyze",
            inputs=analyze_inputs,
        ):
            df = consist.load_df(analyze_inputs["preprocessed"])
            summary = df.groupby("category", as_index=False)["value"].mean()
            consist.log_dataframe(summary, key="analysis")

Decorator Defaults, Templates, and Schema Introspection

For the full guide to @define_step defaults, callable metadata, name templates, schema introspection, and cache invalidation helpers, see Decorators & Metadata.

Passing Data Between Steps with the Coupler

with use_tracker(tracker):
    with consist.scenario("baseline", model="travel_demand") as sc:
        initialize_result = sc.run(  # (1)!
            name="initialize",
            fn=load_population,
            outputs=["population"],
        )
        population_inputs = consist.refs(initialize_result, "population")

        for year in [2020, 2030, 2040]:  # (2)!
            with sc.trace(
                name="simulate",
                run_id=f"baseline_{year}",
                year=year,
                inputs=population_inputs,  # (3)!
            ):
                df_pop = consist.load_df(population_inputs["population"])  # (4)!
                df_result = run_model(year, df_pop)

                consist.log_dataframe(df_result, key="persons")  # (5)!
  1. Step 1: load and prepare the population artifact.
  2. Step 2: simulate for each year.
  3. Declare the dependency on the population artifact.
  4. Load the explicitly linked artifact.
  5. Log output and store it for downstream steps.

What explicit linked inputs do:

  • Declares that this step depends on the selected population artifact
  • Consist tracks this as part of the cache key
  • If the population artifact hasn't changed, this step's cache is still valid

Simpler alternative (auto-load inputs with sc.run)

If your step can be expressed as a function, sc.run can auto-load inputs into function parameters so you don't need to call coupler.require(...) manually:

from consist import ExecutionOptions

def simulate_year(population: pd.DataFrame, config: dict) -> pd.DataFrame:
    year = config["year"]
    return run_model(year, population)

with use_tracker(tracker):
    with consist.scenario("baseline", model="travel_demand") as sc:
        initialize_result = sc.run(
            name="initialize",
            fn=load_population,
            outputs=["population"],
        )
        simulation_inputs = consist.refs(initialize_result, "population")

        for year in [2020, 2030, 2040]:
            sc.run(
                name=f"simulate_{year}",
                fn=simulate_year,
                inputs=simulation_inputs,
                outputs=["persons"],
                execution_options=ExecutionOptions(input_binding="loaded"),
                config={"year": year},
            )

Output Validation

Consist can validate that your workflow produces expected outputs, catching typos or missing data early. You have three patterns to choose from based on your workflow. Most users just need Pattern A.


Pattern A: Declare required outputs (Static workflows) — START HERE

Use this when you know all your workflow outputs upfront.

with use_tracker(tracker):
    with consist.scenario("workflow") as sc:
        sc.require_outputs(
            "zarr_skims",
            "synthetic_population",
        )

        compile_result = sc.run("compile", fn=asim_compile_runner.run)  # (1)!
  1. Run steps; at scenario exit, missing outputs raise RuntimeError.

When to use: Static workflows where steps always produce the same outputs (most common).

Benefits: Simple, clear contract. Missing outputs are caught at scenario exit. Typos are caught immediately.

Optional: Add guardrails for typos

sc.require_outputs(
    "zarr_skims",
    "synthetic_population",
    warn_undefined=True,  # (1)!
    description={
        "zarr_skims": "Zone-to-zone travel times in Zarr format",
        "synthetic_population": "Synthetic population with activity schedules",
    }
)

  1. Warn if you set other keys by mistake.

Shortcut: Pass required outputs directly to scenario():

with consist.scenario(
    "workflow",
    require_outputs=["zarr_skims", "synthetic_population"],
) as sc:
    ...


Pattern B: Runtime-declared validation (Dynamic/optional outputs)

Use this when outputs are dynamic or optional (e.g., optional debug outputs, or conditional branching).

with use_tracker(tracker):
    with consist.scenario("workflow") as sc:
        sc.declare_outputs(
            "zarr_skims", "synthetic_population",
            required={"zarr_skims": True, "synthetic_population": True}
        )

        if debugging:  # (1)!
            sc.declare_outputs("debug_report", required={"debug_report": False})  # (2)!

        compile_result = sc.run("compile", fn=asim_compile_runner.run)  # (3)!
  1. Add optional outputs later if needed.
  2. Declare an optional debug output.
  3. Missing required outputs raise RuntimeError at exit.

When to use: Workflows with per-key control over required vs optional, or conditional outputs.

Benefits: Granular per-key control. Mix required and optional outputs. Add outputs dynamically.


Selective Output Collection with collect_by_keys()

By default, when a step produces multiple outputs, all are automatically synced to the coupler. Use collect_by_keys() when you need to:

  • Select only specific outputs from many (ignore others)
  • Namespace outputs by year or scenario (prefix them)
result = sc.run("step", fn=some_func)  # (1)!
sc.collect_by_keys(result.outputs, "persons", "households")  # (2)!

for year in [2020, 2030, 2040]:
    result = sc.run(f"forecast_{year}", fn=forecast_fn)
    sc.collect_by_keys(result.outputs, "population", "skims", prefix=f"{year}_")  # (3)!
  1. Produces persons, households, jobs.
  2. Keep only selected outputs; others are ignored.
  3. Namespace outputs by year (e.g., 2020_population).

Bulk logging with metadata:

with consist.scenario("outputs") as sc:
    with sc.trace(name="export_outputs"):
        outputs = consist.log_artifacts(  # (1)!
            {
                "persons": "results/persons.parquet",
                "households": "results/households.parquet",
                "jobs": "results/jobs.parquet"
            },
            metadata_by_key={
                "households": {"role": "primary_unit"},
                "jobs": {"role": "employment_proxy"}
            },
            year=2030,
            scenario_name="base"  # (2)!
        )

  1. Log multiple files at once with explicit keys.
  2. All outputs get year=2030, scenario_name="base"; households also gets role="primary_unit".

Example: Parameter Sweep in a Scenario

from pathlib import Path
import pandas as pd

with use_tracker(tracker):
    with consist.scenario("sensitivity_analysis") as sc:
        def load_data() -> pd.DataFrame:
            return pd.read_csv("data.csv")

        setup_result = sc.run(  # (1)!
            name="setup",
            fn=load_data,
            inputs=[Path("data.csv")],
            outputs=["data"],
        )

        for threshold in [0.3, 0.5, 0.7]:  # (2)!
            data_inputs = consist.refs(setup_result, "data")
            with sc.trace(
                name="analyze",
                run_id=f"threshold_{threshold}",
                threshold=threshold,
                inputs=data_inputs,
            ):
                df = consist.load_df(data_inputs["data"])
                filtered = df[df["value"] > threshold]
                consist.log_dataframe(filtered, key="filtered")
  1. Load once, reuse for all variants.
  2. Test different thresholds as separate runs.

Each threshold creates a separate run with its own cache entry. Re-run later? Consist returns cached results for matching thresholds, skips setup (since input unchanged).


Pattern 3: Container Integration

Use containers when you have existing tools, models, or legacy code that runs as a subprocess or Docker container. The image digest becomes part of the cache key. For the full reference, see the Container Integration Guide.

When to use:

  • Running ActivitySim, SUMO, BEAM, or other external models
  • Legacy code you don't want to refactor
  • Python/R/Java executables that you invoke as subprocesses
  • Tools that expect specific file paths or output directories
from consist.integrations.containers import run_container
from pathlib import Path

host_inputs = Path("data/inputs").resolve()
host_outputs = Path("data/outputs").resolve()

result = run_container(
    tracker=tracker,
    run_id="model_2030",
    image="travel-model:v2.1",  # (1)!
    command=["python", "run.py", "--year", "2030"],
    volumes={
        str(host_inputs): "/inputs",
        str(host_outputs): "/outputs",
    },
    inputs=[host_inputs / "input.csv"],
    outputs={"results": host_outputs / "results.parquet"},
)

output_artifact = result.artifacts["results"]
  1. The image digest becomes part of the cache key.

Change the image version? Cache invalidates. Same image + inputs? Returns cached results.

Alternative: use consist.run with ExecutionOptions(executor="container") If you prefer the same API as `consist.run`, you can use the container executor:
import consist
from consist import ExecutionOptions

with consist.use_tracker(tracker):
    result = consist.run(
        name="model_2030",
        inputs=[host_inputs / "input.csv"],
        output_paths={"results": host_outputs / "results.parquet"},
        execution_options=ExecutionOptions(
            executor="container",
            container={
                "image": "travel-model:v2.1",
                "command": ["python", "run.py", "--year", "2030"],
                "volumes": {str(host_inputs): "/inputs", str(host_outputs): "/outputs"},
            },
        ),
    )

Advanced Patterns

Cache Hits and the Coupler

When Consist detects a cache hit (same step, same code version, same config and inputs):

  1. In sc.run(): Your function is skipped entirely. Cached outputs are returned and automatically synced to the coupler.
  2. In sc.trace(): Cached outputs are pre-synced to the coupler BEFORE your trace body runs, so you can access them with coupler.require() immediately. Your code still executes (unlike sc.run()).

This means your code doesn't need to handle cache hits differently—the coupler is populated automatically in both cases.

Cache Hydration

By default, Consist returns metadata-only cache hits (no file copies). You can opt in to materializing cached files when needed:

  • outputs-requested: Copy only specific cached outputs to paths you provide
  • outputs-all: Copy all cached outputs into a target directory while preserving their historical relative layout
  • inputs-missing: When a cache miss occurs, backfill missing inputs from prior runs before executing

Note: outputs-requested requires output_paths=... so Consist knows where to write the files. inputs-missing only works for inputs that are tracked artifacts (not raw paths), so Consist can find the prior run's files or reconstruct ingested tables.

When cache-hit output files are gone, outputs-all and the new run-scoped output recovery API can reconstruct ingested CSV/Parquet outputs from DuckDB. For archive-mirror recovery or debug/export workflows, prefer the explicit tracker.materialize_run_outputs(...) API instead of overloading cache hydration.

For archive-mirror recovery, cache hydration supports materialize_cached_outputs_source_root=Path(...) for outputs-requested and outputs-all. You can pass it through cache_options=CacheOptions(materialize_cached_outputs_source_root=...) on consist.run(...), Tracker.run(...), and scenario steps, or via the low-level tracker.start_run(...) / tracker.begin_run(...) APIs directly.

When you use the explicit tracker.materialize_run_outputs(...) API, target_root may be either the tracker run_dir or any configured tracker mount root without needing allow_external_paths=True.

Set per-run via cache_options=CacheOptions(cache_hydration=...) (for run(...)) or for scenario defaults via step_cache_hydration=...:

with use_tracker(tracker):
    with consist.scenario("baseline", step_cache_hydration="inputs-missing") as sc:
        prepare_result = sc.run(
            name="prepare_population",
            fn=load_population,
            outputs=["population"],
        )
        population_inputs = consist.refs(prepare_result, "population")
        with sc.trace(
            name="simulate",
            inputs=population_inputs,
        ):  # (1)!
            df_pop = consist.load_df(population_inputs["population"])
            ...
  1. This step backfills missing inputs from prior runs before executing.

Mixing Runs and Scenarios

Call consist.run(...) inside a scenario when a step should cache independently:

from consist import ExecutionOptions

with use_tracker(tracker):
    with consist.scenario("baseline") as sc:
        preprocess = consist.run(
            fn=expensive_preprocessing,
            inputs={"network_file": Path("network.geojson")},
            outputs=["processed"],
        )  # (1)!
        simulate_inputs = consist.refs(preprocess, "processed")

        with sc.trace(
            name="simulate",
            inputs=simulate_inputs,
        ):  # (2)!
            network = consist.load_df(simulate_inputs["processed"])
            ...
  1. Run expensive preprocessing independently with its own cache key.
  2. Later steps consume the preprocessed output via an explicit ref.

When Does Code Execute? Understanding sc.run() vs sc.trace()

When building multi-step scientific workflows, a critical question arises: Does my Python code run every time, or only when inputs change? This section clarifies the difference between sc.run() and sc.trace(), two fundamental Consist patterns that differ in execution behavior on cache hits.

The Core Distinction

On a cache hit (when Consist finds previously-cached results for this step with the same inputs):

  • sc.trace(...) — Your Python block always executes. Consist returns cached outputs, but your code still runs. Use this for logging, diagnostics, or steps that must track intermediate state every run.

  • sc.run(...) — Your Python function only executes on cache miss. On a cache hit, Consist skips calling your function entirely and returns the cached output. Use this for expensive operations like scientific simulations, data processing, or model fitting.

Why This Matters: Performance & Side Effects

Consider an expensive simulation that takes 2 hours. Running it 100 times with the same inputs would normally take 200 hours of compute. With Consist:

  • If you use sc.trace(): code runs 100 times (200 hours) — caching provides metadata only
  • If you use sc.run(): code runs once (2 hours), then 99 cache hits retrieve results instantly

Side effects also differ. If your step writes temporary files, updates external systems, or has other side effects, sc.trace() repeats them on every run, while sc.run() skips them on cache hits.

Example: ActivitySim-Style Land Use Simulation

Here's a realistic example showing the difference:

Using sc.trace() (Always Runs)

with use_tracker(tracker):
    with consist.scenario("baseline") as sc:
        year = 2030
        with sc.trace(  # (1)!
            name="prepare_land_use",
            inputs={"geojson": Path("land_use.geojson")},
            year=year
        ):
            print(f"Processing land use for year {year}")  # (2)!
            zones = load_zones("land_use.geojson")

            df_zones = pd.DataFrame(zones)  # (3)!
            consist.log_dataframe(df_zones, key="zones")
  1. This block executes every time, even on cache hits.
  2. Code always runs—useful for logging or status.
  3. Consist still returns cached outputs if they exist.

Using sc.run() (Skips on Cache Hit)

with use_tracker(tracker):
    with consist.scenario("baseline") as sc:
        def prepare_land_use(geojson_path: Path) -> pd.DataFrame:
            print(f"Processing land use")  # (1)!
            zones = load_zones(geojson_path)
            return pd.DataFrame(zones)

        result = sc.run(  # (3)!
            name="prepare_land_use",
            fn=prepare_land_use,
            inputs={"geojson_path": Path("land_use.geojson")},
            outputs=["zones"],
            execution_options=ExecutionOptions(input_binding="paths"),  # (2)!
        )
  1. This function only runs on cache miss; it prints on the first run.
  2. Bind the named input as a local Path object into the function argument.
  3. Outputs are synced to the coupler automatically.

Which Should You Use?

Choose based on your workflow needs:

Scenario Use Why
Expensive simulation, model fitting, or large data transformation sc.run() Skip re-execution on cache hits; critical for 2+ hour runtimes or iterative analysis
Steps that log, print diagnostics, or validate state on every run sc.trace() Need to see side effects repeated; cheaper operations that re-run quickly
Multi-year simulation where early years are cached, new years execute sc.run() Each year has independent cache entry; skip re-running 2020 when computing 2030
Mixed: some expensive, some diagnostic Both in same scenario Use sc.run() for expensive steps, sc.trace() for cheap validation

Practical Guidance

For most scientific workflows, prefer sc.run() when: - Your function is deterministic (no randomness unless seeded in config) - It doesn't have important side effects outside the outputs you log - You can structure it as a pure function (inputs → outputs)

Use sc.trace() when: - You need to run initialization or setup code that triggers external systems - You want explicit control over what happens every run vs. only on cache miss - Your step is fast enough that re-execution overhead doesn't matter

On large file inputs: If your function receives multi-GB files, use execution_options=ExecutionOptions(input_binding="paths") and cache_options=CacheOptions(cache_hydration="inputs-missing") to ensure input files are available on cache misses without loading them into memory.

Alternative: log file outputs inside the step If your function writes files, log them with the injected context (and read inputs yourself if needed):
from consist import ExecutionOptions
from pathlib import Path

def beam_preprocess(data_file, _consist_ctx) -> None:
    out_path = _consist_ctx.output_path("beam_inputs")
    ...
    _consist_ctx.log_output(out_path, key="beam_inputs")

with use_tracker(tracker):
    with consist.scenario("baseline") as sc:
        sc.run(
            name="beam_preprocess",
            fn=beam_preprocess,
            inputs={"data_file": Path("data.parquet")},
            execution_options=ExecutionOptions(
                input_binding="paths",
                inject_context=True,
            ),
        )

Query Facets with pivot_facets

Log small, queryable config values (facets) and pivot them into a wide table for analysis. This is a simple way to compare many runs side‑by‑side (for example, a sensitivity analysis across parameters):

from sqlmodel import select
import consist

params = consist.pivot_facets(
    namespace="simulate",
    keys=["alpha", "beta", "mode"],
    value_columns={"mode": "value_str"},
)  # (1)!

rows = consist.run_query(
    select(params.c.run_id, params.c.alpha, params.c.beta, params.c.mode),
    tracker=tracker,
)
  1. Pivot config facets into columns for analysis.

See Concepts for when to use config vs facet.


Motivation: When Caching Saves Time

Caching is most valuable in workflows with many runs and expensive computation. Here are realistic scenarios from scientific domains:

Example 1: Land-Use Model Sensitivity Analysis

A sensitivity sweep tests 40 parameter combinations (toll levels, parking costs, transit subsidies). - Each ActivitySim run: 30 minutes - Without caching: 40 runs × 30 min = 20 hours - With caching: Base population synthesis (30 min, once) + 39 parameter tweaks with cache hits (5 min each) = 3.75 hours - Time saved: 81% reduction in modeling time

Example 2: ActivitySim Calibration Iteration

Mode choice coefficients need iterative calibration against observed transit ridership. - Without caching: Repeat all 3 steps = 75 minutes per iteration × 5 iterations = 375 minutes total - With caching: Step 1–2 are cache hits, only step 3 re-executes = 115 minutes - Time saved: 69% reduction; frees analyst time for interpretation

Example 3: Grid Dispatch Multi-Scenario Ensemble

A baseline scenario and 8 future scenarios all share the same network preprocessing pipeline. - Preprocessing: 3 hours; Each scenario dispatch: 20 minutes - Without caching: 9 × (3 hours + 20 min) = 29.85 hours - With caching: Preprocessing once (3 hours), then 8 scenario runs hit cache on preprocessing = 5.67 hours - Time saved: 81% reduction; enables rapid scenario exploration


Querying Results

Finding Runs

See: Example notebooks.

import consist

run = consist.find_run(
    tracker=tracker,
    parent_id="baseline",  # (1)!
    year=2030,
    model="simulate"
)

runs_by_year = consist.find_runs(
    tracker=tracker,
    parent_id="baseline",
    model="simulate",
    stage="supply_demand_loop",
    phase="traffic_assignment",
    index_by="year"
)
result_2030 = runs_by_year[2030]
  1. Filter by scenario ID.

Loading Artifacts

See: Example notebooks.

artifacts = tracker.get_artifacts_for_run(run.id)
persons_artifact = artifacts.outputs["persons"]

df = consist.load_df(persons_artifact)  # (1)!
  1. Load the artifact data into a DataFrame.

Cross-Run Queries with Views

See: Example notebooks.

Register schemas to enable SQL queries across all runs:

import consist
from sqlmodel import SQLModel, Field, select, func
from consist import Tracker

class Person(SQLModel, table=True):
    person_id: int = Field(primary_key=True)
    age: int
    number_of_trips: int

tracker = Tracker(
    run_dir="./runs",
    db_path="./provenance.duckdb",
    schemas=[Person]
)

VPerson = tracker.views.Person  # (1)!

query = (
    select(
        VPerson.consist_scenario_id,
        VPerson.consist_year,
        func.avg(VPerson.number_of_trips).label("avg_trips")
    )
    .where(VPerson.consist_scenario_id.in_(["baseline", "high_gas"]))
    .group_by(VPerson.consist_scenario_id, VPerson.consist_year)
)

results = consist.run_query(query, tracker=tracker)
  1. Views are available after running scenarios and registering schemas.

Views automatically include consist_scenario_id, consist_year, and other metadata columns for filtering and grouping. For more on ingestion and hybrid views, see Data Materialization Strategy.

Generating Schemas from Captured Data

If you ingest tabular data into DuckDB, Consist can capture the observed schema and export an editable SQLModel stub so you can curate PK/FK constraints and then register the model for views. This is useful when you want a stable, documented schema for downstream analysis or audits.

You can also opt into lightweight file schema capture when logging CSV/Parquet artifacts by passing profile_file_schema=True (and optionally file_schema_sample_rows=) to log_artifact. These captured schemas are stored in the provenance DB and remain available even if the original files move or are deleted. If you already have a content hash (e.g., after copying or moving a file), pass content_hash= to log_artifact to reuse it without re-hashing the file. For safety, Consist will not overwrite an existing, different hash unless you pass force_hash_override=True. To verify the hash against disk, use validate_content_hash=True.

See Schema Export for the full workflow (CLI + Python) and column-name/__tablename__ guidelines. See Data Materialization Strategy for ingestion tradeoffs and DB fallback behavior.