loom.etl.pipeline¶
User-facing alias for ETL pipeline declaration primitives.
- class loom.etl.pipeline.ETLParams[source]¶
Bases:
StructBase class for all ETL step parameter types.
Subclass and declare typed fields. Instances are immutable and serializable via
msgspec— suitable for Celery task payloads and ETL run record persistence.Date fields (
date,datetime) are compatible with the declarativeparamsproxy:params.run_date.year,params.run_date.month,params.run_date.dayetc. are resolved by the compiler against the field type and by the executor at runtime.Example:
from datetime import date class DailyOrdersParams(ETLParams): run_date: date countries: tuple[str, ...]
- class loom.etl.pipeline.ETLStep[source]¶
Bases:
Generic[ParamsT]Base class for all ETL transformation steps.
Subclass, declare
sourcesandtarget, then implementexecute(). The*separator inexecutemakes all injected DataFrame parameters keyword-only — the executor always injects by name.Source forms¶
Three authoring forms are supported (see
loom.etl._source). Mixing Form 1 and Form 2 in the same class is a definition-time error.Example:
class BuildOrdersStaging(ETLStep[DailyOrdersParams]): sources = Sources( orders=FromTable("raw.orders").where( (col("year") == params.run_date.year) & (col("month") == params.run_date.month), ), customers=FromTable("raw.customers"), ) target = IntoTable("staging.orders").replace_partitions( values={"year": params.run_date.year, "month": params.run_date.month} ) def execute( self, params: DailyOrdersParams, *, orders: pl.DataFrame, customers: pl.DataFrame, ) -> pl.DataFrame: return orders.join(customers, on="customer_id", how="left")
- class loom.etl.pipeline.StepSQL[source]¶
Bases:
ETLStep[ParamsT],Generic[ParamsT,FrameT]SQL-first ETL step — transformation declared as SQL, not Python.
Declare
sqlas a class variable (static SQL) or a@staticmethodthat receives params and returns a SQL string (dynamic SQL).Sources still support
.where()for pre-filtering before the frame is registered as a view. Target write modes work identically toETLStep.Warning
Avoid interpolating raw string params directly. Use
FromTable.where()for source-level filtering instead.
- class loom.etl.pipeline.ETLProcess[source]¶
Bases:
Generic[ParamsT]Base class for ordered sets of ETL steps.
Declare
stepsas a list of step types. A nested list withinstepsdeclares a parallel group — the executor dispatches all steps in the group concurrently and waits for all to complete before advancing.All steps must share the same
ParamsTas the process — enforced by theETLCompiler.Example:
class DailyPipeline(ETLProcess[DailyOrdersParams]): steps = [ IngestStep, [EnrichA, EnrichB], # parallel AggregateStep, ]
- class loom.etl.pipeline.ETLPipeline[source]¶
Bases:
Generic[ParamsT]Top-level ETL orchestration unit.
Declares an ordered list of
ETLProcesstypes. A nested list withinprocessesdeclares a parallel group.The pipeline is the entry point for
ETLCompilerandETLExecutor. All processes must share the sameParamsT.- processes¶
Ordered list of process types. Nested lists are parallel groups.
- Type:
ClassVar[list[Any]]
Example:
class DailyOrdersPipeline(ETLPipeline[DailyOrdersParams]): processes = [ BuildStagingProcess, [EnrichProductsProcess, EnrichCustomersProcess], AggregateProcess, ]