Skip to content

Workflow Contexts

Workflow contexts are the APIs that coordinate multi-step pipelines.

When to use workflow contexts

  • Use Tracker.scenario(...) or consist.scenario(...) to group related steps under one scenario header run.
  • Use ScenarioContext.run(...) for cache-aware function-shaped steps.
  • Use ScenarioContext.trace(...) when the inline block must execute every time (including cache hits).
  • Use RunContext inside inject_context=True runs for managed output paths and run-aware logging helpers.

Relationship to consist.run and Tracker.run

API Execution shape Cache-hit behavior
consist.run / Tracker.run Single function call Skips function execution and reuses cached outputs
ScenarioContext.run Function step inside scenario Same cache behavior as Tracker.run
ScenarioContext.trace / Tracker.trace Inline with block Block executes each time; cache state is still recorded

Scenario step identity kwargs

Tracker.scenario(...) creates the header context. Identity kwargs are passed on step surfaces (ScenarioContext.run(...) / ScenarioContext.trace(...)):

with tracker.scenario("baseline") as sc:
    step = sc.run(
        fn=prepare,
        name="prepare",
        adapter=activitysim_adapter,
        identity_inputs=[("asim_config", asim_config_dir)],
        outputs=["prepared"],
    )

    with sc.trace(
        "analyze",
        adapter=activitysim_adapter,
        identity_inputs=[("asim_config", asim_config_dir)],
    ):
        analyze_inline(step.outputs["prepared"])

config_plan and hash_inputs are not accepted on scenario step run/trace surfaces. Use adapter and identity_inputs.

BindingResult execution envelopes

ScenarioContext.run(...) also accepts binding=BindingResult(...) for orchestrator-facing code that has already resolved a step's explicit inputs, required coupler keys, and optional coupler keys. Keep using primitive inputs/input_keys/optional_input_keys kwargs for direct workflow code; the binding envelope is the preferred path when the binding decision already lives outside Consist.

binding is execution-only and mutually exclusive with the primitive input kwargs. It does not change ScenarioContext.trace(...).

from pathlib import Path
import consist
from consist import BindingResult, ExecutionOptions, Tracker

tracker = Tracker(run_dir="./runs", db_path="./provenance.duckdb")


def prepare(raw: Path) -> dict[str, Path]:
    out = consist.output_path("prepared", ext="parquet")
    out.write_text("prepared\n")
    return {"prepared": out}


def analyze(prepared: Path) -> dict[str, Path]:
    out = consist.output_path("analysis", ext="parquet")
    out.write_text(prepared.read_text())
    return {"analysis": out}


with tracker.scenario("baseline") as sc:
    prep = sc.run(
        fn=prepare,
        inputs={"raw": Path("raw.csv")},
        execution_options=ExecutionOptions(input_binding="paths"),
        outputs=["prepared"],
    )

    sc.run(
        fn=analyze,
        binding=BindingResult(
            inputs={"prepared": consist.ref(prep, key="prepared")},
            input_keys=["prepared"],
        ),
        execution_options=ExecutionOptions(input_binding="loaded"),
        outputs=["analysis"],
    )

For direct step-to-step workflow code, consist.ref(...) and consist.refs(...) remain the recommended default. Reach for BindingResult when a planner or external orchestrator has already compiled the binding decision.

Minimal runnable scenario example

from pathlib import Path
import consist
from consist import Tracker

tracker = Tracker(run_dir="./runs", db_path="./provenance.duckdb")

def prepare() -> Path:
    out = consist.output_path("prepared", ext="txt")
    out.write_text("prepared\n")
    return out

def analyze(prepared_artifact) -> Path:
    out = consist.output_path("analysis", ext="txt")
    out.write_text(prepared_artifact.path.read_text())
    return out

with tracker.scenario("baseline") as sc:
    prep = sc.run(fn=prepare, name="prepare", outputs=["prepared"])
    analysis = sc.run(
        fn=analyze,
        name="analyze",
        inputs={"prepared_artifact": consist.ref(prep, key="prepared")},
        outputs=["analysis"],
    )

Scenario Context

A context manager for grouping multiple steps into a single "scenario".

A scenario creates a parent run (the "header") that aggregates the results, metadata, and lineage of all steps executed within its block. It provides a coupler to pass artifacts between steps, making it ideal for multi-stage simulation workflows.

Attributes:

Name Type Description
coupler Coupler

Scenario-local artifact registry for passing outputs between steps. Supports runtime-declared output validation.

Examples:

with tracker.scenario("base_case") as sc:
    # Step 1: Pre-process
    sc.run(preprocess_fn, inputs={"raw": "data.csv"}, outputs=["clean"])
    # Step 2: Model (reads "clean" from the coupler automatically)
    sc.run(model_fn, input_keys=["clean"], outputs=["results"])

run_id property

Run ID of the scenario header.

Returns:

Type Description
str

The run ID for the scenario header (or the scenario name if the header has not been created yet).

config property

Read-only view of the scenario configuration.

Returns:

Type Description
MappingProxyType

Immutable mapping of configuration values for the scenario. Updates are applied by changing inputs to the scenario, not by mutating this mapping.

inputs property

Read-only view of registered exogenous inputs.

Returns:

Type Description
MappingProxyType

Immutable mapping of input keys to artifacts added via add_input. This reflects only scenario-level inputs, not step inputs.

add_input(path, key, **kwargs)

Log an external input artifact to the scenario header run.

Parameters:

Name Type Description Default
path ArtifactRef

Path (or prebuilt Artifact) representing the input.

required
key str

Semantic key for the artifact.

required
**kwargs Any

Additional metadata forwarded to Tracker.log_artifact.

{}

Returns:

Type Description
Artifact

Logged artifact associated with the scenario.

Raises:

Type Description
RuntimeError

If a step has already started or the scenario context is inactive.

declare_outputs(*names, required=False, warn_undefined=False, description=None)

Declare outputs that should be present in the scenario coupler.

Parameters:

Name Type Description Default
*names str

Output keys to declare.

()
required bool | Mapping[str, bool]

Whether declared outputs are required. A mapping allows per-key overrides.

False
warn_undefined bool

If True, warn when outputs are logged that were not declared.

False
description Optional[Mapping[str, str]]

Human-readable descriptions for declared outputs.

None

require_outputs(*names, required=True, warn_undefined=False, description=None)

Declare required outputs that must be present at scenario exit.

This is a convenience wrapper around declare_outputs that defaults required=True.

Parameters:

Name Type Description Default
*names str

Output keys to require.

()
required bool | Mapping[str, bool]

Whether required outputs are enforced. A mapping allows per-key overrides.

True
warn_undefined bool

If True, warn when outputs are logged that were not declared.

False
description Optional[Mapping[str, str]]

Human-readable descriptions for required outputs.

None

collect_by_keys(artifacts, *keys, prefix='')

Collect explicit artifacts into the scenario coupler by key.

Parameters:

Name Type Description Default
artifacts Mapping[str, Artifact]

Source artifacts mapping (usually outputs from a step).

required
*keys str

Keys to collect from the mapping.

()
prefix str

Optional prefix to apply to collected keys in the coupler.

""

Returns:

Type Description
Dict[str, Artifact]

The collected artifacts keyed by their (possibly prefixed) names.

run(fn=None, name=None, *, run_id=None, model=None, description=None, config=None, adapter=None, config_plan_ingest=True, config_plan_profile_schema=False, inputs=None, input_keys=None, optional_input_keys=None, binding=None, depends_on=None, tags=None, facet=None, facet_from=None, facet_schema_version=None, facet_index=None, identity_inputs=None, year=None, iteration=None, phase=None, stage=None, parent_run_id=None, outputs=None, output_paths=None, capture_dir=None, capture_pattern='*', cache_options=None, output_policy=None, execution_options=None)

Execute a cached scenario step and update the Coupler with outputs.

This method wraps Tracker.run while ensuring the scenario header is updated with step metadata and artifacts. Use execution_options.runtime_kwargs for runtime-only inputs and consist.require_runtime_kwargs to validate required keys. For direct workflow code, prefer primitive inputs= kwargs and, when needed, the direct input_keys= / optional_input_keys= compatibility surfaces. For complex or externally orchestrated workflows that already resolved the binding plan, pass binding=BindingResult(...) instead; binding is an execution envelope and is mutually exclusive with the primitive input kwargs. Pass policy controls via cache_options, output_policy, and execution_options.

adapter and identity_inputs are the public identity-related kwargs.

Examples:

Direct workflow code:

sc.run(
    fn=step,
    inputs={"raw": raw_path},
    execution_options=ExecutionOptions(input_binding="paths"),
)

sc.run(
    fn=step,
    inputs={"raw": consist.ref(previous_result, key="raw")},
    execution_options=ExecutionOptions(input_binding="loaded"),
)

Orchestrator-facing execution envelope:

sc.run(
    fn=step,
    binding=BindingResult(
        inputs={"raw": raw_path},
        input_keys=["data"],
        optional_input_keys=["maybe"],
    ),
    execution_options=ExecutionOptions(input_binding="paths"),
)

binding cannot be combined with primitive inputs, input_keys, or optional_input_keys.

trace(name, *, run_id=None, model=None, description=None, config=None, adapter=None, config_plan_ingest=True, config_plan_profile_schema=False, inputs=None, input_keys=None, optional_input_keys=None, depends_on=None, tags=None, facet=None, facet_from=None, facet_schema_version=None, facet_index=None, identity_inputs=None, year=None, iteration=None, parent_run_id=None, outputs=None, output_paths=None, capture_dir=None, capture_pattern='*', cache_mode='reuse', cache_hydration=None, cache_version=None, cache_epoch=None, validate_cached_outputs='lazy', code_identity=None, code_identity_extra_deps=None, output_mismatch='warn', output_missing='warn')

Manual tracing context manager for scenario steps.

This wraps Tracker.trace to log a step while allowing inline code blocks. Use ScenarioContext.run when you want function execution to be skipped on cache hits.

adapter and identity_inputs are the public identity-related kwargs.

Run Context

A lightweight helper object injected into user functions. When you execute a run with inject_context=True, Consist passes a RunContext to your function. This allows you to access run-aware helpers—like the run's dedicated artifact directory and artifact logging methods—without needing to reference a global tracker instance directly.

Examples:

def my_step(ctx: RunContext):
    # Access the run's dedicated directory
    output_path = ctx.run_dir / "results.csv"
    # ... generate file ...
    ctx.log_artifact(output_path, "results")

run_dir property

Run-specific output directory for the active step.

Returns:

Type Description
Path

The directory where this step should write outputs by default. This value is derived from the active run and respects any per-run artifact directory overrides.

inputs property

Mapping of input artifact keys to artifacts for the active step.

Returns:

Type Description
Dict[str, Artifact]

Dictionary of input artifacts keyed by their semantic keys. Raises a RuntimeError if accessed outside an active run.

output_dir(namespace=None)

Resolve the managed output directory for the active run.

Parameters:

Name Type Description Default
namespace Optional[str]

Optional relative subdirectory under the managed run output directory.

None

Returns:

Type Description
Path

Absolute directory path for managed outputs. The directory is created if it does not exist.

output_path(key, ext='parquet')

Resolve a deterministic managed output path for the active run.

Parameters:

Name Type Description Default
key str

Artifact key used as the output filename stem. Relative path segments are allowed to organize outputs into subdirectories.

required
ext str

File extension to append. Leading dots are ignored and the extension is normalized to lowercase.

"parquet"

Returns:

Type Description
Path

Absolute managed output path. Parent directories are created if needed.

load(key_or_artifact)

Load data from an input artifact by key or from an Artifact instance.

Parameters:

Name Type Description Default
key_or_artifact Union[str, Artifact]

Input artifact key from inputs or an Artifact object.

required

Returns:

Type Description
Any

Loaded data (driver-dependent).

log_artifact(*args, **kwargs)

Log an artifact within the active run.

This is a thin wrapper around Tracker.log_artifact.

Parameters:

Name Type Description Default
*args Any

Positional arguments forwarded to Tracker.log_artifact.

()
**kwargs Any

Keyword arguments forwarded to Tracker.log_artifact.

{}

Returns:

Type Description
Artifact

The logged artifact.

log_artifacts(*args, **kwargs)

Log multiple artifacts within the active run.

This is a thin wrapper around Tracker.log_artifacts.

Parameters:

Name Type Description Default
*args Any

Positional arguments forwarded to Tracker.log_artifacts.

()
**kwargs Any

Keyword arguments forwarded to Tracker.log_artifacts.

{}

Returns:

Type Description
Dict[str, Artifact]

Mapping of artifact keys to logged artifacts.

log_input(*args, **kwargs)

Log an input artifact within the active run.

This is a thin wrapper around Tracker.log_input.

Parameters:

Name Type Description Default
*args Any

Positional arguments forwarded to Tracker.log_input.

()
**kwargs Any

Keyword arguments forwarded to Tracker.log_input.

{}

Returns:

Type Description
Artifact

The logged input artifact.

log_output(*args, **kwargs)

Log an output artifact within the active run.

This is a thin wrapper around Tracker.log_output.

Parameters:

Name Type Description Default
*args Any

Positional arguments forwarded to Tracker.log_output.

()
**kwargs Any

Keyword arguments forwarded to Tracker.log_output.

{}

Returns:

Type Description
Artifact

The logged output artifact.

log_meta(**kwargs)

Update metadata for the active run.

This is a thin wrapper around Tracker.log_meta.

Parameters:

Name Type Description Default
**kwargs Any

Metadata key/value pairs to merge into the run record.

{}

capture_outputs(directory, pattern='*')

Capture files written under directory and log them as outputs on exit.

Parameters:

Name Type Description Default
directory Path

Directory to monitor for new or modified files.

required
pattern str

Glob pattern for files to capture.

"*"

Yields:

Type Description
OutputCapture

Container listing artifacts that were logged during the context.

Coupler

Bases: CouplerMapMixin[Artifact, DeclaredOutput]

Scenario-local helper to thread named artifacts between steps.

Coupler is intentionally small: - It stores the "latest Artifact for a semantic key" in-memory.

It does not log artifacts, infer inputs/outputs, or mutate Artifacts as a side effect of reads. Keep provenance operations on the Tracker.

set(key, artifact)

Store an artifact under a validated key.

set_from_artifact(key, value)

Set an artifact, accepting both Artifact objects and artifact-like values.

This method is useful when integrating with optional dependencies (like noop mode) where you may receive either: - A real Artifact (when tracking is enabled) - An artifact-like object with .path and .container_uri properties (noop mode) - A Path or string (fallback)

All three forms are stored in the coupler and can be retrieved with get() or require().

Parameters:

Name Type Description Default
key str

The coupler key (e.g., "persons", "skims"). Must follow artifact-key rules.

required
value Artifact or artifact - like or Path or str

The value to store. Can be a real Artifact, artifact-like object with .path/.container_uri properties, a Path, or a string path.

required

Returns:

Type Description
Any

The value that was stored.

Examples:

Using with optional Consist dependency:

# Works whether log_output returns Artifact or NoopArtifact
artifact = tracker.log_output(path, key="persons")
coupler.set_from_artifact("persons", artifact)

Mixed real and fallback artifacts:

artifact = log_output(path) or path  # Fallback to path string
coupler.set_from_artifact("key", artifact)  # Handles both

update(artifacts=None, /, **kwargs)

Bulk-store artifacts from a mapping and/or keyword arguments.

view(namespace)

Return a namespace-scoped coupler view.

The returned view writes keys as <namespace>/<key> into this coupler while allowing reads and writes using namespace-local key names.

get(key)

Return artifact for key or None if unset.

require(key)

Return artifact for key or raise a helpful KeyError.

keys()

items()

values()

path(key, *, required=True)

Resolve an artifact's URI to an absolute host path.

This does not mutate the Artifact; it only returns a resolved Path.

declare_outputs(*names, required=False, warn_undefined=False, description=None)

Declare expected output keys and optional metadata.

missing_declared_outputs()

Return required declared outputs that are still unset.

require_outputs(*names, required=True, warn_undefined=False, description=None)

Convenience wrapper for declaring outputs as required.

collect_by_keys(artifacts, *keys, prefix='')

Copy selected artifact keys into this coupler with an optional prefix.

describe_outputs()

Return a copy of declared-output descriptions.