ETL

loom.etl is a declarative ETL subsystem with compile-time validation, backend-agnostic declarations, and a single runtime entrypoint (ETLRunner).

Install

Choose one backend:

pip install "loom-kernel[etl-polars]"
# or
pip install "loom-kernel[etl-spark]"

Minimal pipeline

from datetime import date

import polars as pl
from loom.etl import (
    ETLParams,
    ETLStep,
    ETLProcess,
    ETLPipeline,
    ETLRunner,
    FromTable,
    IntoTable,
)


class DailyParams(ETLParams):
    run_date: date


class CleanOrders(ETLStep[DailyParams]):
    orders = FromTable("raw.orders").columns("id", "amount", "run_date")
    target = IntoTable("staging.orders").replace()

    def execute(self, params: DailyParams, *, orders: pl.LazyFrame) -> pl.LazyFrame:
        return orders.filter(pl.col("amount") > 0)


class DailyProcess(ETLProcess[DailyParams]):
    steps = [CleanOrders]


class DailyPipeline(ETLPipeline[DailyParams]):
    processes = [DailyProcess]


runner = ETLRunner.from_dict(
    storage={
        "engine": "polars",
        "defaults": {"table_path": {"uri": "/var/lib/loom/lake"}},
    }
)
runner.run(DailyPipeline, DailyParams(run_date=date(2026, 3, 30)))

File source with JSON payload + final CSV report

import polars as pl
from loom.etl import ETLStep, ETLProcess, ETLPipeline, FromFile, FromTable, IntoFile, IntoTable, Format

class Payload:
    store: str
    amount: float
    items: int

class LoadEvents(ETLStep[DailyParams]):
    events = FromFile("/data/raw/events.csv", format=Format.CSV).parse_json("payload", Payload)
    target = IntoTable("staging.events").replace()

    def execute(self, params: DailyParams, *, events: pl.LazyFrame) -> pl.LazyFrame:
        return events.select(
            pl.col("payload").struct.field("store").alias("store"),
            pl.col("payload").struct.field("amount").alias("amount"),
            pl.col("payload").struct.field("items").alias("items"),
        )

class BuildReport(ETLStep[DailyParams]):
    events = FromTable("staging.events")
    target = IntoFile("/data/exports/daily_report.csv", format=Format.CSV)

    def execute(self, params: DailyParams, *, events: pl.LazyFrame) -> pl.LazyFrame:
        return events.group_by("store").agg(
            pl.col("amount").sum().alias("gross_amount"),
            pl.col("items").sum().alias("item_count"),
        )

class ReportProcess(ETLProcess[DailyParams]):
    steps = [LoadEvents, BuildReport]

class ReportPipeline(ETLPipeline[DailyParams]):
    processes = [ReportProcess]

File aliases (FileLocator)

Hard-coding file paths in pipelines couples the logic to the infrastructure. Use FromFile.alias() / IntoFile.alias() to declare a logical name and resolve it at runtime through the storage config.

Declare aliases in the storage.files block:

storage:
  engine: polars
  files:
    - name: events_raw
      path:
        uri: s3://raw-bucket/events/
        storage_options:
          AWS_REGION: eu-west-1
    - name: exports_daily
      path:
        uri: s3://exports-bucket/daily/

Reference them in pipelines using the alias API:

from loom.etl import ETLStep, FromFile, IntoFile, Format

class LoadEvents(ETLStep[DailyParams]):
    events = FromFile.alias("events_raw", format=Format.CSV)
    target = IntoFile.alias("exports_daily", format=Format.PARQUET)

    def execute(self, params: DailyParams, *, events: pl.LazyFrame) -> pl.LazyFrame:
        return events

The runner resolves aliases to physical URIs at job startup — pipelines never hard-code storage paths or credentials.

You can also implement FileLocator directly for custom routing strategies:

from loom.etl.storage import FileLocator, FileLocation

class MyFileLocator:
    def locate(self, name: str) -> FileLocation:
        return FileLocation(uri_template=f"s3://my-bucket/{name}/")

YAML config (Polars path)

storage:
  engine: polars

  defaults:
    table_path:
      uri: s3://my-lake
      storage_options:
        AWS_REGION: ${oc.env:AWS_REGION}
        AWS_ACCESS_KEY_ID: ${oc.env:AWS_ACCESS_KEY_ID}
        AWS_SECRET_ACCESS_KEY: ${oc.env:AWS_SECRET_ACCESS_KEY}

  tmp_root: /var/lib/loom/lake/_tmp

observability:
  log: true
  slow_step_threshold_ms: 30000
  run_sink:
    # Choose exactly one destination:
    root: /var/lib/loom/lake/_runs
    # database: ops
from loom.etl import ETLRunner

runner = ETLRunner.from_yaml("config/etl.yaml")

Write modes

Every IntoTable target declares exactly one write mode by chaining a method.

append

Adds rows to the table. Creates the table on first write.

target = IntoTable("staging.orders").append()

replace

Full overwrite. Replaces all data in the table.

target = IntoTable("staging.orders").replace()
# Overwrite schema too:
target = IntoTable("staging.orders").replace(schema=SchemaMode.OVERWRITE)

replace_partitions

Replaces only the partitions present in the batch. The writer collects the distinct partition values from the frame at write time — no params required.

target = IntoTable("staging.orders").replace_partitions("year", "month")

Use this for incremental loads where the batch carries its own partition identity (e.g. daily runs writing the day’s data).

replace_partition

Replaces a specific partition whose values come from run params. Resolves the equality predicate at runtime without collecting from the frame.

from loom.etl import params

target = IntoTable("staging.orders").replace_partition(
    year=params.run_date.year,
    month=params.run_date.month,
)

Use this when the partition to replace is known at pipeline design time (e.g. reprocessing a single day).

Difference from replace_partitions:

replace_partitions

replace_partition

Partition values

Collected from frame

Resolved from params

Collect step

Yes (distinct)

No

Use case

Batch carries its partition

Reprocessing a known partition

replace_where

Replaces rows matching an arbitrary predicate. Accepts the full predicate DSL.

target = IntoTable("staging.orders").replace_where(
    col("date").between(params.start_date, params.end_date)
)

upsert

MERGE on key columns. Inserts new rows and updates existing ones.

target = IntoTable("events.orders").upsert(
    keys=("order_id",),
    partition_cols=("year", "month"),  # strongly recommended for large tables
    exclude=("created_at",),           # columns excluded from UPDATE SET
)

partition_cols is optional but strongly recommended — without it every MERGE forces a full table scan.


Running only selected stages

Use include with process or step class names:

runner.run(
    DailyPipeline,
    DailyParams(run_date=date(2026, 3, 30)),
    include=["DailyProcess", "CleanOrders"],
)

If no name matches, InvalidStageError is raised.

Spark runtime

For Databricks/Unity Catalog runtime:

from loom.etl import ETLRunner

runner = ETLRunner.from_spark(spark)

This wires Spark reader/writer/catalog automatically.

Testing ETL steps

Use the built-in test harnesses:

  • loom.etl.testing.PolarsStepRunner

  • loom.etl.testing.spark.SparkStepRunner

  • loom.etl.testing.ETLScenario

These let you seed source tables, run one step in isolation, and assert output without wiring full storage infrastructure.

File aliases (FileLocator)

Hard-coding file paths in pipelines couples the logic to the infrastructure. Use FromFile.alias() / IntoFile.alias() to declare a logical name and resolve it at runtime through the storage config.

Declare aliases in the storage.files block:

storage:
  engine: polars
  files:
    - name: events_raw
      path:
        uri: s3://raw-bucket/events/
        storage_options:
          AWS_REGION: eu-west-1
    - name: exports_daily
      path:
        uri: s3://exports-bucket/daily/

Reference them in pipelines:

from loom.etl import ETLStep, FromFile, IntoFile, Format

class LoadEvents(ETLStep[DailyParams]):
    events = FromFile.alias("events_raw", format=Format.CSV)
    target = IntoFile.alias("exports_daily", format=Format.PARQUET)

    def execute(self, params: DailyParams, *, events: pl.LazyFrame) -> pl.LazyFrame:
        return events

The runner resolves aliases to physical URIs at job startup — pipelines never hard-code storage paths or credentials.

You can also implement the FileLocator protocol directly for custom routing:

from loom.etl.storage import FileLocator, FileLocation

class MyFileLocator:
    def locate(self, name: str) -> FileLocation:
        return FileLocation(uri_template=f"s3://my-bucket/{name}/")

Loading config from cloud storage

ETLRunner.from_yaml() accepts cloud URIs (s3://, gs://, abfss://, r2://, …) in addition to local paths. Files are fetched via fsspec at startup:

runner = ETLRunner.from_yaml("s3://my-bucket/config/etl.yaml")

Multi-file composition:

runner = ETLRunner.from_yaml(
    "s3://my-bucket/config/base.yaml",
    "s3://my-bucket/config/prod.yaml",
)

The includes directive is not supported for cloud URIs. Use explicit multi-file composition instead.


Pluggable config resolvers

Extend the YAML loader with custom ${prefix:key} placeholders to fetch secrets from external stores (AWS SSM, Azure Key Vault, …) at parse time:

from loom.core.config import load_config

class SsmResolver:
    name = "ssm"

    def __init__(self, region: str) -> None:
        self._region = region

    def resolve(self, key: str) -> str:
        import boto3
        client = boto3.client("ssm", region_name=self._region)
        return client.get_parameter(Name=key, WithDecryption=True)["Parameter"]["Value"]

cfg = load_config("config/etl.yaml", resolvers=[SsmResolver("eu-west-1")])

In YAML, reference secrets via ${ssm:/path/to/secret}:

storage:
  catalogs:
    main:
      token: ${ssm:/prod/databricks/token}

Resolvers run at job startup — secret rotation takes effect on the next run without redeployment.


End-to-end example

A full working example with Polars and Spark pipelines, Delta Lake, and observability is available in the companion repository: dummy-loom-etl.


API reference

The ETL API reference is generated from public docstrings: