Skip to content

DLT Loader Integration Guide

The DLT (Data Load Tool) integration enables robust, schema-validated ingestion of data into DuckDB with automatic provenance column injection. This guide covers when to use DLT, how to configure schemas, and best practices for data quality.

Requires the optional ingest extra before using DLT features:

pip install "consist[ingest]"

What is DLT?

dlt is an open-source library for extracting and loading data. Consist uses it to:

  • Ingest diverse formats (Parquet, CSV, JSON, Python objects) into DuckDB
  • Auto-detect and enforce schemas (schema enforcement when provided)
  • Handle data quality issues (type mismatches, missing values, duplicates)
  • Inject Consist provenance columns (consist_run_id, consist_artifact_id, consist_year, etc.)
  • Scale efficiently with streaming and batching

When to Use DLT vs Direct Logging

When to Use Each API

Querying ingested data across runs:

import consist
from sqlmodel import select

VPerson = tracker.views.Person
rows = consist.run_query(
    select(VPerson.consist_run_id, VPerson.age),
    tracker=tracker,
)

Logging a file without ingestion:

with tracker.start_run("log_result", model="demo"):
    consist.log_artifact(result_path, key="my_result")

Decision Tree

Need cross-run SQL queries?
├─ YES → Use DLT (register a schema)
└─ NO  → Need schema validation?
         ├─ YES → Use DLT with a schema
         └─ NO  → Use direct logging

Basic DLT Workflow

Step 1: Define a Schema (SQLModel)

from sqlmodel import SQLModel, Field
from typing import Optional

class Person(SQLModel, table=True):
    """Schema for person records."""
    person_id: int = Field(primary_key=True)
    age: int
    income: Optional[float]
    name: str

Step 2: Create Tracker with Schema

from consist import Tracker
from pathlib import Path

tracker = Tracker(
    run_dir="./runs",
    db_path="./provenance.duckdb",
    schemas=[Person],  # Register schema
)

Step 3: Log Data with Schema

import pandas as pd

# Create data
df = pd.DataFrame({
    "person_id": [1, 2, 3],
    "age": [25, 30, 35],
    "income": [50000.0, 60000.0, 70000.0],
    "name": ["Alice", "Bob", "Carol"],
})

# Log with schema (DLT ingestion)
with tracker.start_run("ingest_people", model="demo"):
    tracker.log_dataframe(df, key="persons", schema=Person)

Step 4: Query Across Runs

from sqlmodel import Session, select, func

# Compute an aggregate over a single run_id (via the hybrid view).
VPerson = tracker.views.Person
with Session(tracker.engine) as session:
    avg_age = session.exec(
        # Filter to one Consist run and take an average over the ingested rows.
        select(func.avg(VPerson.age)).where(VPerson.consist_run_id == "run_123")
    ).first()
    print(f"Average age in run_123: {avg_age}")

Expected output:

Average age in run_123: 30.0

Schema Definition & Validation

Basic Schema

from sqlmodel import SQLModel, Field
from typing import Optional

class Trip(SQLModel, table=True):
    trip_id: int = Field(primary_key=True)
    person_id: int
    origin: str
    destination: str
    distance_miles: float
    mode: str  # "car", "transit", "bike"
    departure_hour: Optional[int]

Validation: Required vs Optional

class StrictTrip(SQLModel, table=True):
    trip_id: int = Field(primary_key=True, description="Unique trip ID")
    person_id: int = Field(gt=0)  # Must be positive
    mode: str = Field(min_length=1)  # Non-empty
    departure_hour: Optional[int] = Field(ge=0, le=23)  # 0-23 if present

Foreign Keys (Relationships)

from sqlmodel import SQLModel, Field, Relationship

class Person(SQLModel, table=True):
    person_id: int = Field(primary_key=True)
    name: str
    # trips: List["Trip"] = Relationship(back_populates="person")

class Trip(SQLModel, table=True):
    trip_id: int = Field(primary_key=True)
    person_id: int = Field(foreign_key="person.person_id")
    distance_miles: float
    # person: Person = Relationship(back_populates="trips")

Type Mapping

Python Type DuckDB Type Notes
int INTEGER
float DOUBLE
str VARCHAR
bool BOOLEAN
date DATE Use datetime.date
datetime TIMESTAMP Use datetime.datetime
Optional[T] T NULL Allows NULL
List[T] T[] Arrays (advanced)

Logging Patterns

Single DataFrame

import consist

df = pd.read_csv("results.csv")
with tracker.start_run("log_results", model="demo"):
    consist.log_dataframe(
        df,
        key="results",
        schema=MySchema,  # Validate against schema
    )

Parquet File

with tracker.start_run("log_parquet", model="demo"):
    tracker.log_artifact(
        Path("results.parquet"),
        key="raw_results",
        schema=MySchema,  # Schema is stored; call tracker.ingest(...) to ingest
    )

CSV File

with tracker.start_run("log_csv", model="demo"):
    tracker.log_artifact(
        Path("results.csv"),
        key="csv_results",
        schema=MySchema,
    )

Zarr / NetCDF (Matrix Data)

# This logs the artifact only; it does not ingest metadata by itself.
with tracker.start_run("log_zarr", model="demo"):
    tracker.log_artifact(
        Path("simulation_output.zarr"),
        key="gridded_results",
        driver="zarr",
    )

To ingest metadata/catalog tables, run an ingestion path after logging rather than relying on log_artifact(...) alone.


Data Quality & Error Handling

Schema Enforcement (Fail on Issues)

When you provide a schema, Consist enforces the column set and types. Extra columns raise a ValueError, and type mismatches are surfaced during ingestion.

with tracker.start_run("strict_ingest", model="demo"):
    tracker.log_dataframe(
        df,
        key="strict_results",
        schema=MySchema,
    )

If you want best-effort ingestion (no strict schema), omit the schema and let DLT infer the structure.

Type Coercion

DLT attempts to coerce types:

df = pd.DataFrame({
    "trip_id": ["1", "2", "3"],      # Strings
    "distance": [1.5, 2.2, 3.1],     # Floats
})

# Ingested as:
# trip_id: [1, 2, 3]  (coerced to int)
# distance: [1.5, 2.2, 3.1]  (floats)

To avoid surprises, ensure input DataFrame matches schema types:

df = df.astype({
    "trip_id": "int64",
    "distance": "float64",
})

Handling Missing Data

class Trip(SQLModel, table=True):
    trip_id: int = Field(primary_key=True)
    person_id: int
    departure_hour: Optional[int]  # Can be NULL
    arrival_hour: Optional[int]

Missing values in Optional fields → NULL in DB. Missing in required fields → error or default depending on the schema and ingestion behavior.

Duplicate Handling

Primary keys are not enforced automatically in all cases. If you need uniqueness, deduplicate before ingestion:

class Household(SQLModel, table=True):
    household_id: int = Field(primary_key=True)
    size: int

# If DataFrame has duplicate household_id:
df = pd.DataFrame({
    "household_id": [1, 2, 1],  # Duplicate!
    "size": [4, 3, 5],
})

To handle duplicates, deduplicate before ingestion:

df = df.drop_duplicates(subset=["household_id"], keep="last")

Provenance Columns

Consist automatically injects system columns during ingestion:

Available Columns

Column Type Description
consist_run_id str ID of the run that created this data
consist_artifact_id str Artifact ID of the source file
consist_scenario_id str Scenario ID (available in hybrid views)
consist_year int Year (if provided to run context)
consist_iteration int Iteration count (if provided)

Example Query

from sqlmodel import Session, select, func

VPerson = tracker.views.Person
with Session(tracker.engine) as session:
    # Count persons per run
    results = session.exec(
        select(
            VPerson.consist_run_id,
            func.count(VPerson.person_id).label("count")
        ).group_by(VPerson.consist_run_id)
    ).all()

    for run_id, count in results:
        print(f"Run {run_id}: {count} persons")

Filtering by Provenance

# Get persons from a specific scenario year
with Session(tracker.engine) as session:
    persons_2030 = session.exec(
        select(VPerson).where(
            VPerson.consist_year == 2030,
            VPerson.consist_scenario_id == "baseline"
        )
    ).all()

Advanced Patterns

Multi-Step Ingestion (Scenario)

import consist
from consist import Tracker, use_tracker

tracker = Tracker(
    run_dir="./runs",
    db_path="./provenance.duckdb",
    schemas=[Person, Trip, Household],
)

with use_tracker(tracker):
    with consist.scenario("model_run_2030", year=2030) as sc:

        # Step 1: Load persons
        with sc.trace(name="load_persons"):
            df_persons = load_population_data()
            consist.log_dataframe(
                df_persons,
                key="population",
                schema=Person,
            )

        # Step 2: Simulate trips
        with sc.trace(name="simulate_trips"):
            df_trips = run_trip_simulation(df_persons)
            consist.log_dataframe(
                df_trips,
                key="trips",
                schema=Trip,
            )

        # Step 3: Aggregate
        with sc.trace(name="aggregate"):
            # Query previous step
            with Session(tracker.engine) as session:
                total_trips = session.exec(
                    select(func.count(Trip.trip_id)).where(
                        Trip.consist_scenario_id == "model_run_2030"
                    )
                ).first()
            print(f"Total trips: {total_trips}")

All data from steps 1-3 is queryable together:

# Cross-step query
with Session(tracker.engine) as session:
    results = session.exec(
        select(Person, Trip).join(
            Trip, Person.person_id == Trip.person_id
        ).where(Trip.consist_scenario_id == "model_run_2030")
    ).all()

Incremental Ingestion (Batches)

For very large datasets, ingest in batches:

from pathlib import Path

# Split large file into chunks
import pandas as pd

chunk_size = 100000
for i, chunk in enumerate(pd.read_csv("large_file.csv", chunksize=chunk_size)):
    consist.log_dataframe(
        chunk,
        key=f"data_chunk_{i}",
        schema=MySchema,
    )

DuckDB automatically unions these into a single table when the schema/table name is the same (for example, when you pass schema=MySchema each time):

# Query all chunks together
with Session(tracker.engine) as session:
    count = session.exec(select(func.count(MySchema.id))).first()

Late-Arriving Data

If you ingest data, then later find more data to add:

# Run 1: Initial data
consist.log_dataframe(df1, key="data", schema=MySchema)

# Run 2: Additional data (same key, same schema)
consist.log_dataframe(df2, key="data", schema=MySchema)

Both sets are ingested and queryable:

with Session(tracker.engine) as session:
    # Both df1 and df2 are included
    results = session.exec(select(MySchema)).all()

Performance Tuning

Batch Size

For large files, stream in chunks rather than loading the full file into memory:

for chunk in pd.read_csv("file.csv", chunksize=50000):
    consist.log_dataframe(chunk, key="data", schema=MySchema)

Indexing

After ingestion, create indexes for frequently queried columns:

# Manual index (in DuckDB):
with tracker.engine.begin() as conn:
    conn.exec_driver_sql(
        "CREATE INDEX idx_person_run ON global_tables.person(consist_run_id)"
    )

File Format Choice

  • Parquet: Faster loading, better compression, typed columns → preferred
  • CSV: Human-readable, larger files, slower parsing → use for interchange

Deduplication

If your pipeline generates duplicate records, deduplicate before ingestion:

df = df.drop_duplicates(subset=["id"])

Consist does not currently deduplicate automatically.


Common Errors

Schema Mismatch

Error: Column 'age' expected int, got str

Fix: Ensure DataFrame types match schema:

df["age"] = df["age"].astype("int64")

Missing Required Column

Error: Column 'person_id' required but missing

Fix: Add column or make it optional:

df["person_id"] = df.index + 1  # Add column
# OR
class MySchema(SQLModel, table=True):
    person_id: Optional[int]  # Make optional

Primary Key Violation

Error: Duplicate primary key value

Fix: Deduplicate before ingestion:

df = df.drop_duplicates(subset=["id"])

DLT Not Installed

ImportError: No module named 'dlt'

Fix: Install the ingest extras:

pip install "consist[ingest]"

Null in Non-Optional Field

Warning: Null value in non-optional field 'age'

Fix (with schema enforcement): Ensure no nulls:

df = df.dropna(subset=["age"])

# OR make optional:
class MySchema(SQLModel, table=True):
    age: Optional[int]


Comparison: DLT vs Direct Logging

Feature DLT Direct Logging
Schema validation ✅ Yes ❌ No
Cross-run SQL queries ✅ Yes ❌ No
Type enforcement ✅ Yes ❌ No
Setup overhead ⚠️ Moderate ✅ Minimal
Best for Analytics, large data Simple results
Example use case Population, trips table Single analysis output

See Also