Workflow Contexts¶
Workflow contexts are the APIs that coordinate multi-step pipelines.
When to use workflow contexts¶
- Use
Tracker.scenario(...)orconsist.scenario(...)to group related steps under one scenario header run. - Use
ScenarioContext.run(...)for cache-aware function-shaped steps. - Use
ScenarioContext.trace(...)when the inline block must execute every time (including cache hits). - Use
RunContextinsideinject_context=Trueruns for managed output paths and run-aware logging helpers.
Relationship to consist.run and Tracker.run¶
| API | Execution shape | Cache-hit behavior |
|---|---|---|
consist.run / Tracker.run |
Single function call | Skips function execution and reuses cached outputs |
ScenarioContext.run |
Function step inside scenario | Same cache behavior as Tracker.run |
ScenarioContext.trace / Tracker.trace |
Inline with block |
Block executes each time; cache state is still recorded |
Scenario step identity kwargs¶
Tracker.scenario(...) creates the header context. Identity kwargs are passed on
step surfaces (ScenarioContext.run(...) / ScenarioContext.trace(...)):
with tracker.scenario("baseline") as sc:
step = sc.run(
fn=prepare,
name="prepare",
adapter=activitysim_adapter,
identity_inputs=[("asim_config", asim_config_dir)],
outputs=["prepared"],
)
with sc.trace(
"analyze",
adapter=activitysim_adapter,
identity_inputs=[("asim_config", asim_config_dir)],
):
analyze_inline(step.outputs["prepared"])
config_plan and hash_inputs are not accepted on scenario step run/trace
surfaces. Use adapter and identity_inputs.
BindingResult execution envelopes¶
ScenarioContext.run(...) also accepts binding=BindingResult(...) for
orchestrator-facing code that has already resolved a step's explicit inputs,
required coupler keys, and optional coupler keys. Keep using primitive
inputs/input_keys/optional_input_keys kwargs for direct workflow code; the
binding envelope is the preferred path when the binding decision already
lives outside Consist.
binding is execution-only and mutually exclusive with the primitive input
kwargs. It does not change ScenarioContext.trace(...).
from pathlib import Path
import consist
from consist import BindingResult, ExecutionOptions, Tracker
tracker = Tracker(run_dir="./runs", db_path="./provenance.duckdb")
def prepare(raw: Path) -> dict[str, Path]:
out = consist.output_path("prepared", ext="parquet")
out.write_text("prepared\n")
return {"prepared": out}
def analyze(prepared: Path) -> dict[str, Path]:
out = consist.output_path("analysis", ext="parquet")
out.write_text(prepared.read_text())
return {"analysis": out}
with tracker.scenario("baseline") as sc:
prep = sc.run(
fn=prepare,
inputs={"raw": Path("raw.csv")},
execution_options=ExecutionOptions(input_binding="paths"),
outputs=["prepared"],
)
sc.run(
fn=analyze,
binding=BindingResult(
inputs={"prepared": consist.ref(prep, key="prepared")},
input_keys=["prepared"],
),
execution_options=ExecutionOptions(input_binding="loaded"),
outputs=["analysis"],
)
For direct step-to-step workflow code, consist.ref(...) and consist.refs(...)
remain the recommended default. Reach for BindingResult when a planner or
external orchestrator has already compiled the binding decision.
Minimal runnable scenario example¶
from pathlib import Path
import consist
from consist import Tracker
tracker = Tracker(run_dir="./runs", db_path="./provenance.duckdb")
def prepare() -> Path:
out = consist.output_path("prepared", ext="txt")
out.write_text("prepared\n")
return out
def analyze(prepared_artifact) -> Path:
out = consist.output_path("analysis", ext="txt")
out.write_text(prepared_artifact.path.read_text())
return out
with tracker.scenario("baseline") as sc:
prep = sc.run(fn=prepare, name="prepare", outputs=["prepared"])
analysis = sc.run(
fn=analyze,
name="analyze",
inputs={"prepared_artifact": consist.ref(prep, key="prepared")},
outputs=["analysis"],
)
Scenario Context¶
A context manager for grouping multiple steps into a single "scenario".
A scenario creates a parent run (the "header") that aggregates the
results, metadata, and lineage of all steps executed within its block.
It provides a coupler to pass artifacts between steps, making it
ideal for multi-stage simulation workflows.
Attributes:
| Name | Type | Description |
|---|---|---|
coupler |
Coupler
|
Scenario-local artifact registry for passing outputs between steps. Supports runtime-declared output validation. |
Examples:
with tracker.scenario("base_case") as sc:
# Step 1: Pre-process
sc.run(preprocess_fn, inputs={"raw": "data.csv"}, outputs=["clean"])
# Step 2: Model (reads "clean" from the coupler automatically)
sc.run(model_fn, input_keys=["clean"], outputs=["results"])
run_id
property
¶
Run ID of the scenario header.
Returns:
| Type | Description |
|---|---|
str
|
The run ID for the scenario header (or the scenario name if the header has not been created yet). |
config
property
¶
Read-only view of the scenario configuration.
Returns:
| Type | Description |
|---|---|
MappingProxyType
|
Immutable mapping of configuration values for the scenario. Updates are applied by changing inputs to the scenario, not by mutating this mapping. |
inputs
property
¶
Read-only view of registered exogenous inputs.
Returns:
| Type | Description |
|---|---|
MappingProxyType
|
Immutable mapping of input keys to artifacts added via |
add_input(path, key, **kwargs)
¶
Log an external input artifact to the scenario header run.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
ArtifactRef
|
Path (or prebuilt |
required |
key
|
str
|
Semantic key for the artifact. |
required |
**kwargs
|
Any
|
Additional metadata forwarded to |
{}
|
Returns:
| Type | Description |
|---|---|
Artifact
|
Logged artifact associated with the scenario. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If a step has already started or the scenario context is inactive. |
declare_outputs(*names, required=False, warn_undefined=False, description=None)
¶
Declare outputs that should be present in the scenario coupler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*names
|
str
|
Output keys to declare. |
()
|
required
|
bool | Mapping[str, bool]
|
Whether declared outputs are required. A mapping allows per-key overrides. |
False
|
warn_undefined
|
bool
|
If True, warn when outputs are logged that were not declared. |
False
|
description
|
Optional[Mapping[str, str]]
|
Human-readable descriptions for declared outputs. |
None
|
require_outputs(*names, required=True, warn_undefined=False, description=None)
¶
Declare required outputs that must be present at scenario exit.
This is a convenience wrapper around declare_outputs that defaults
required=True.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*names
|
str
|
Output keys to require. |
()
|
required
|
bool | Mapping[str, bool]
|
Whether required outputs are enforced. A mapping allows per-key overrides. |
True
|
warn_undefined
|
bool
|
If True, warn when outputs are logged that were not declared. |
False
|
description
|
Optional[Mapping[str, str]]
|
Human-readable descriptions for required outputs. |
None
|
collect_by_keys(artifacts, *keys, prefix='')
¶
Collect explicit artifacts into the scenario coupler by key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
artifacts
|
Mapping[str, Artifact]
|
Source artifacts mapping (usually outputs from a step). |
required |
*keys
|
str
|
Keys to collect from the mapping. |
()
|
prefix
|
str
|
Optional prefix to apply to collected keys in the coupler. |
""
|
Returns:
| Type | Description |
|---|---|
Dict[str, Artifact]
|
The collected artifacts keyed by their (possibly prefixed) names. |
run(fn=None, name=None, *, run_id=None, model=None, description=None, config=None, adapter=None, config_plan_ingest=True, config_plan_profile_schema=False, inputs=None, input_keys=None, optional_input_keys=None, binding=None, depends_on=None, tags=None, facet=None, facet_from=None, facet_schema_version=None, facet_index=None, identity_inputs=None, year=None, iteration=None, phase=None, stage=None, parent_run_id=None, outputs=None, output_paths=None, capture_dir=None, capture_pattern='*', cache_options=None, output_policy=None, execution_options=None)
¶
Execute a cached scenario step and update the Coupler with outputs.
This method wraps Tracker.run while ensuring the scenario header
is updated with step metadata and artifacts.
Use execution_options.runtime_kwargs for runtime-only inputs and
consist.require_runtime_kwargs to validate required keys.
For direct workflow code, prefer primitive inputs= kwargs and, when
needed, the direct input_keys= / optional_input_keys= compatibility
surfaces. For complex or externally orchestrated workflows that already
resolved the binding plan, pass binding=BindingResult(...) instead;
binding is an execution envelope and is mutually exclusive with the
primitive input kwargs.
Pass policy controls via cache_options, output_policy,
and execution_options.
adapter and identity_inputs are the public identity-related
kwargs.
Examples:
Direct workflow code:
sc.run(
fn=step,
inputs={"raw": raw_path},
execution_options=ExecutionOptions(input_binding="paths"),
)
sc.run(
fn=step,
inputs={"raw": consist.ref(previous_result, key="raw")},
execution_options=ExecutionOptions(input_binding="loaded"),
)
Orchestrator-facing execution envelope:
sc.run(
fn=step,
binding=BindingResult(
inputs={"raw": raw_path},
input_keys=["data"],
optional_input_keys=["maybe"],
),
execution_options=ExecutionOptions(input_binding="paths"),
)
binding cannot be combined with primitive inputs, input_keys, or
optional_input_keys.
trace(name, *, run_id=None, model=None, description=None, config=None, adapter=None, config_plan_ingest=True, config_plan_profile_schema=False, inputs=None, input_keys=None, optional_input_keys=None, depends_on=None, tags=None, facet=None, facet_from=None, facet_schema_version=None, facet_index=None, identity_inputs=None, year=None, iteration=None, parent_run_id=None, outputs=None, output_paths=None, capture_dir=None, capture_pattern='*', cache_mode='reuse', cache_hydration=None, cache_version=None, cache_epoch=None, validate_cached_outputs='lazy', code_identity=None, code_identity_extra_deps=None, output_mismatch='warn', output_missing='warn')
¶
Manual tracing context manager for scenario steps.
This wraps Tracker.trace to log a step while allowing inline code
blocks. Use ScenarioContext.run when you want function execution
to be skipped on cache hits.
adapter and identity_inputs are the public identity-related
kwargs.
Run Context¶
A lightweight helper object injected into user functions.
When you execute a run with inject_context=True, Consist passes a RunContext
to your function. This allows you to access run-aware helpers—like the run's
dedicated artifact directory and artifact logging methods—without needing to
reference a global tracker instance directly.
Examples:
def my_step(ctx: RunContext):
# Access the run's dedicated directory
output_path = ctx.run_dir / "results.csv"
# ... generate file ...
ctx.log_artifact(output_path, "results")
run_dir
property
¶
Run-specific output directory for the active step.
Returns:
| Type | Description |
|---|---|
Path
|
The directory where this step should write outputs by default. This value is derived from the active run and respects any per-run artifact directory overrides. |
inputs
property
¶
output_dir(namespace=None)
¶
Resolve the managed output directory for the active run.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
namespace
|
Optional[str]
|
Optional relative subdirectory under the managed run output directory. |
None
|
Returns:
| Type | Description |
|---|---|
Path
|
Absolute directory path for managed outputs. The directory is created if it does not exist. |
output_path(key, ext='parquet')
¶
Resolve a deterministic managed output path for the active run.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Artifact key used as the output filename stem. Relative path segments are allowed to organize outputs into subdirectories. |
required |
ext
|
str
|
File extension to append. Leading dots are ignored and the extension is normalized to lowercase. |
"parquet"
|
Returns:
| Type | Description |
|---|---|
Path
|
Absolute managed output path. Parent directories are created if needed. |
load(key_or_artifact)
¶
log_artifact(*args, **kwargs)
¶
Log an artifact within the active run.
This is a thin wrapper around Tracker.log_artifact.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*args
|
Any
|
Positional arguments forwarded to |
()
|
**kwargs
|
Any
|
Keyword arguments forwarded to |
{}
|
Returns:
| Type | Description |
|---|---|
Artifact
|
The logged artifact. |
log_artifacts(*args, **kwargs)
¶
Log multiple artifacts within the active run.
This is a thin wrapper around Tracker.log_artifacts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*args
|
Any
|
Positional arguments forwarded to |
()
|
**kwargs
|
Any
|
Keyword arguments forwarded to |
{}
|
Returns:
| Type | Description |
|---|---|
Dict[str, Artifact]
|
Mapping of artifact keys to logged artifacts. |
log_input(*args, **kwargs)
¶
Log an input artifact within the active run.
This is a thin wrapper around Tracker.log_input.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*args
|
Any
|
Positional arguments forwarded to |
()
|
**kwargs
|
Any
|
Keyword arguments forwarded to |
{}
|
Returns:
| Type | Description |
|---|---|
Artifact
|
The logged input artifact. |
log_output(*args, **kwargs)
¶
Log an output artifact within the active run.
This is a thin wrapper around Tracker.log_output.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*args
|
Any
|
Positional arguments forwarded to |
()
|
**kwargs
|
Any
|
Keyword arguments forwarded to |
{}
|
Returns:
| Type | Description |
|---|---|
Artifact
|
The logged output artifact. |
log_meta(**kwargs)
¶
Update metadata for the active run.
This is a thin wrapper around Tracker.log_meta.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**kwargs
|
Any
|
Metadata key/value pairs to merge into the run record. |
{}
|
capture_outputs(directory, pattern='*')
¶
Capture files written under directory and log them as outputs on exit.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
directory
|
Path
|
Directory to monitor for new or modified files. |
required |
pattern
|
str
|
Glob pattern for files to capture. |
"*"
|
Yields:
| Type | Description |
|---|---|
OutputCapture
|
Container listing artifacts that were logged during the context. |
Coupler¶
Bases: CouplerMapMixin[Artifact, DeclaredOutput]
Scenario-local helper to thread named artifacts between steps.
Coupler is intentionally small: - It stores the "latest Artifact for a semantic key" in-memory.
It does not log artifacts, infer inputs/outputs, or mutate Artifacts as a side effect of reads. Keep provenance operations on the Tracker.
set(key, artifact)
¶
Store an artifact under a validated key.
set_from_artifact(key, value)
¶
Set an artifact, accepting both Artifact objects and artifact-like values.
This method is useful when integrating with optional dependencies (like noop mode) where you may receive either: - A real Artifact (when tracking is enabled) - An artifact-like object with .path and .container_uri properties (noop mode) - A Path or string (fallback)
All three forms are stored in the coupler and can be retrieved with get() or require().
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
The coupler key (e.g., "persons", "skims"). Must follow artifact-key rules. |
required |
value
|
Artifact or artifact - like or Path or str
|
The value to store. Can be a real Artifact, artifact-like object with .path/.container_uri properties, a Path, or a string path. |
required |
Returns:
| Type | Description |
|---|---|
Any
|
The value that was stored. |
Examples:
Using with optional Consist dependency:
# Works whether log_output returns Artifact or NoopArtifact
artifact = tracker.log_output(path, key="persons")
coupler.set_from_artifact("persons", artifact)
Mixed real and fallback artifacts:
update(artifacts=None, /, **kwargs)
¶
Bulk-store artifacts from a mapping and/or keyword arguments.
view(namespace)
¶
Return a namespace-scoped coupler view.
The returned view writes keys as <namespace>/<key> into this coupler
while allowing reads and writes using namespace-local key names.
get(key)
¶
Return artifact for key or None if unset.
require(key)
¶
Return artifact for key or raise a helpful KeyError.
keys()
¶
items()
¶
values()
¶
path(key, *, required=True)
¶
Resolve an artifact's URI to an absolute host path.
This does not mutate the Artifact; it only returns a resolved Path.
declare_outputs(*names, required=False, warn_undefined=False, description=None)
¶
Declare expected output keys and optional metadata.
missing_declared_outputs()
¶
Return required declared outputs that are still unset.
require_outputs(*names, required=True, warn_undefined=False, description=None)
¶
Convenience wrapper for declaring outputs as required.
collect_by_keys(artifacts, *keys, prefix='')
¶
Copy selected artifact keys into this coupler with an optional prefix.
describe_outputs()
¶
Return a copy of declared-output descriptions.