ETL Quickstart

loom.etl is a declarative ETL subsystem with compile-time validation, backend-agnostic declarations, and a single runtime entrypoint (ETLRunner).

Install

Choose one backend:

pip install "loom-kernel[etl-polars]"
# or
pip install "loom-kernel[etl-spark]"

Minimal pipeline

from datetime import date

import polars as pl
from loom.etl import (
    ETLParams,
    ETLStep,
    ETLProcess,
    ETLPipeline,
    ETLRunner,
    FromTable,
    IntoTable,
)


class DailyParams(ETLParams):
    run_date: date


class CleanOrders(ETLStep[DailyParams]):
    orders = FromTable("raw.orders").columns("id", "amount", "run_date")
    target = IntoTable("staging.orders").replace()

    def execute(self, params: DailyParams, *, orders: pl.LazyFrame) -> pl.LazyFrame:
        return orders.filter(pl.col("amount") > 0)


class DailyProcess(ETLProcess[DailyParams]):
    steps = [CleanOrders]


class DailyPipeline(ETLPipeline[DailyParams]):
    processes = [DailyProcess]


runner = ETLRunner.from_dict(
    storage={
        "engine": "polars",
        "defaults": {"table_path": {"uri": "/var/lib/loom/lake"}},
    }
)
runner.run(DailyPipeline, DailyParams(run_date=date(2026, 3, 30)))

Basic write modes

Every IntoTable target declares exactly one write mode by chaining a method.

Mode

What it does

Example

append

Add rows to the table

IntoTable("staging.orders").append()

replace

Full overwrite

IntoTable("staging.orders").replace()

replace_partitions

Overwrite only partitions present in the batch

IntoTable("staging.orders").replace_partitions("year", "month")

replace_partition

Overwrite a single known partition

IntoTable("staging.orders").replace_partition(year=params.run_date.year)

replace_where

Overwrite rows matching a predicate

IntoTable("staging.orders").replace_where(col("date") == params.run_date)

upsert

Merge on key columns

IntoTable("staging.orders").upsert(keys=("order_id",))

See the ETL pipelines guide for the full write-mode reference.

YAML config

storage:
  engine: polars
  defaults:
    table_path:
      uri: s3://my-lake
      storage_options:
        AWS_REGION: ${oc.env:AWS_REGION}
        AWS_ACCESS_KEY_ID: ${oc.env:AWS_ACCESS_KEY_ID}
        AWS_SECRET_ACCESS_KEY: ${oc.env:AWS_SECRET_ACCESS_KEY}

  tmp_root: /var/lib/loom/lake/_tmp

observability:
  log: true
  slow_step_threshold_ms: 30000
  run_sink:
    root: /var/lib/loom/lake/_runs
from loom.etl import ETLRunner

runner = ETLRunner.from_yaml("config/etl.yaml")

File aliases

Hard-coding file paths couples logic to infrastructure. Use aliases instead:

storage:
  engine: polars
  files:
    - name: events_raw
      path:
        uri: s3://raw-bucket/events/
        storage_options:
          AWS_REGION: eu-west-1
from loom.etl import ETLStep, FromFile, IntoFile, Format

class LoadEvents(ETLStep[DailyParams]):
    events = FromFile.alias("events_raw", format=Format.CSV)
    target = IntoFile.alias("exports_daily", format=Format.PARQUET)

    def execute(self, params: DailyParams, *, events: pl.LazyFrame) -> pl.LazyFrame:
        return events

Next steps