ETL Quickstart¶
loom.etl is a declarative ETL subsystem with compile-time validation,
backend-agnostic declarations, and a single runtime entrypoint (ETLRunner).
Install¶
Choose one backend:
pip install "loom-kernel[etl-polars]"
# or
pip install "loom-kernel[etl-spark]"
Minimal pipeline¶
from datetime import date
import polars as pl
from loom.etl import (
ETLParams,
ETLStep,
ETLProcess,
ETLPipeline,
ETLRunner,
FromTable,
IntoTable,
)
class DailyParams(ETLParams):
run_date: date
class CleanOrders(ETLStep[DailyParams]):
orders = FromTable("raw.orders").columns("id", "amount", "run_date")
target = IntoTable("staging.orders").replace()
def execute(self, params: DailyParams, *, orders: pl.LazyFrame) -> pl.LazyFrame:
return orders.filter(pl.col("amount") > 0)
class DailyProcess(ETLProcess[DailyParams]):
steps = [CleanOrders]
class DailyPipeline(ETLPipeline[DailyParams]):
processes = [DailyProcess]
runner = ETLRunner.from_dict(
storage={
"engine": "polars",
"defaults": {"table_path": {"uri": "/var/lib/loom/lake"}},
}
)
runner.run(DailyPipeline, DailyParams(run_date=date(2026, 3, 30)))
Basic write modes¶
Every IntoTable target declares exactly one write mode by chaining a method.
Mode |
What it does |
Example |
|---|---|---|
|
Add rows to the table |
|
|
Full overwrite |
|
|
Overwrite only partitions present in the batch |
|
|
Overwrite a single known partition |
|
|
Overwrite rows matching a predicate |
|
|
Merge on key columns |
|
See the ETL pipelines guide for the full write-mode reference.
YAML config¶
storage:
engine: polars
defaults:
table_path:
uri: s3://my-lake
storage_options:
AWS_REGION: ${oc.env:AWS_REGION}
AWS_ACCESS_KEY_ID: ${oc.env:AWS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY: ${oc.env:AWS_SECRET_ACCESS_KEY}
tmp_root: /var/lib/loom/lake/_tmp
observability:
log: true
slow_step_threshold_ms: 30000
run_sink:
root: /var/lib/loom/lake/_runs
from loom.etl import ETLRunner
runner = ETLRunner.from_yaml("config/etl.yaml")
File aliases¶
Hard-coding file paths couples logic to infrastructure. Use aliases instead:
storage:
engine: polars
files:
- name: events_raw
path:
uri: s3://raw-bucket/events/
storage_options:
AWS_REGION: eu-west-1
from loom.etl import ETLStep, FromFile, IntoFile, Format
class LoadEvents(ETLStep[DailyParams]):
events = FromFile.alias("events_raw", format=Format.CSV)
target = IntoFile.alias("exports_daily", format=Format.PARQUET)
def execute(self, params: DailyParams, *, events: pl.LazyFrame) -> pl.LazyFrame:
return events
Next steps¶
ETL pipelines guide — full write modes, Spark runtime, cloud config, and pluggable resolvers
ETL testing guide — in-memory runners, scenarios, and stubs
ETL examples — companion repository with runnable Polars and Spark pipelines