loom.etl.pipeline

User-facing alias for ETL pipeline declaration primitives.

class loom.etl.pipeline.ETLParams[source]

Bases: Struct

Base class for all ETL step parameter types.

Subclass and declare typed fields. Instances are immutable and serializable via msgspec — suitable for Celery task payloads and ETL run record persistence.

Date fields (date, datetime) are compatible with the declarative params proxy: params.run_date.year, params.run_date.month, params.run_date.day etc. are resolved by the compiler against the field type and by the executor at runtime.

Example:

from datetime import date

class DailyOrdersParams(ETLParams):
    run_date: date
    countries: tuple[str, ...]
class loom.etl.pipeline.ETLStep[source]

Bases: Generic[ParamsT]

Base class for all ETL transformation steps.

Subclass, declare sources and target, then implement execute(). The * separator in execute makes all injected DataFrame parameters keyword-only — the executor always injects by name.

Source forms

Three authoring forms are supported (see loom.etl._source). Mixing Form 1 and Form 2 in the same class is a definition-time error.

Example:

class BuildOrdersStaging(ETLStep[DailyOrdersParams]):
    sources = Sources(
        orders=FromTable("raw.orders").where(
            (col("year")  == params.run_date.year)
            & (col("month") == params.run_date.month),
        ),
        customers=FromTable("raw.customers"),
    )
    target = IntoTable("staging.orders").replace_partitions(
        values={"year": params.run_date.year, "month": params.run_date.month}
    )

    def execute(
        self,
        params: DailyOrdersParams,
        *,
        orders: pl.DataFrame,
        customers: pl.DataFrame,
    ) -> pl.DataFrame:
        return orders.join(customers, on="customer_id", how="left")
execute(params, **frames)[source]

Transform source frames into the output frame.

Must be overridden by every concrete step. Declare the params and frame parameters explicitly — the compiler validates the signature against sources at compile time.

Parameters:
  • params (ParamsT) – Typed params instance for this run.

  • **frames (Any) – Source DataFrames injected by the executor, keyed by the source alias declared in sources.

Returns:

Transformed DataFrame (type must match the active backend).

Return type:

Any

class loom.etl.pipeline.StepSQL[source]

Bases: ETLStep[ParamsT], Generic[ParamsT, FrameT]

SQL-first ETL step — transformation declared as SQL, not Python.

Declare sql as a class variable (static SQL) or a @staticmethod that receives params and returns a SQL string (dynamic SQL).

Sources still support .where() for pre-filtering before the frame is registered as a view. Target write modes work identically to ETLStep.

Warning

Avoid interpolating raw string params directly. Use FromTable.where() for source-level filtering instead.

execute(params, **frames)[source]

Transform source frames into the output frame.

Must be overridden by every concrete step. Declare the params and frame parameters explicitly — the compiler validates the signature against sources at compile time.

Parameters:
  • params (Any) – Typed params instance for this run.

  • **frames (Any) – Source DataFrames injected by the executor, keyed by the source alias declared in sources.

Returns:

Transformed DataFrame (type must match the active backend).

Return type:

Any

render_sql(params)[source]

Render SQL for current params instance.

Parameters:

params (Any)

Return type:

str

class loom.etl.pipeline.ETLProcess[source]

Bases: Generic[ParamsT]

Base class for ordered sets of ETL steps.

Declare steps as a list of step types. A nested list within steps declares a parallel group — the executor dispatches all steps in the group concurrently and waits for all to complete before advancing.

All steps must share the same ParamsT as the process — enforced by the ETLCompiler.

steps

Ordered list of step types. Nested lists are parallel groups.

Type:

ClassVar[list[Any]]

Example:

class DailyPipeline(ETLProcess[DailyOrdersParams]):
    steps = [
        IngestStep,
        [EnrichA, EnrichB],   # parallel
        AggregateStep,
    ]
class loom.etl.pipeline.ETLPipeline[source]

Bases: Generic[ParamsT]

Top-level ETL orchestration unit.

Declares an ordered list of ETLProcess types. A nested list within processes declares a parallel group.

The pipeline is the entry point for ETLCompiler and ETLExecutor. All processes must share the same ParamsT.

processes

Ordered list of process types. Nested lists are parallel groups.

Type:

ClassVar[list[Any]]

Example:

class DailyOrdersPipeline(ETLPipeline[DailyOrdersParams]):
    processes = [
        BuildStagingProcess,
        [EnrichProductsProcess, EnrichCustomersProcess],
        AggregateProcess,
    ]
class loom.etl.pipeline.ParamExpr(path)[source]

Bases: _ColOps

Lazy path expression capturing params field access.

Parameters:

path (tuple[str, ...])

property path: tuple[str, ...]

Captured attribute path from params root.