loom.etl.declarative

I/O declaration API for ETL sources, targets, and file formats.

class loom.etl.declarative.Format(value)[source]

Bases: StrEnum

Supported I/O formats for ETL sources and targets.

Used by FromFile and IntoFile to declare the physical format of a file-based source or target. DELTA is the implicit format for FromTable and IntoTable — it does not need to be set explicitly there.

Example:

target = IntoFile("s3://exports/report_{run_date}.csv", format=Format.CSV)
class loom.etl.declarative.CsvReadOptions(separator=', ', has_header=True, null_values=<factory>, encoding='utf8', infer_schema_length=100, skip_rows=0)[source]

Bases: object

Read options for CSV / TSV file sources.

All fields default to the most common CSV convention.

Parameters:
  • separator (str) – Column delimiter character. Defaults to ",".

  • has_header (bool) – Whether the first row is a header. Defaults to True.

  • null_values (tuple[str, ...]) – Strings that should be interpreted as null.

  • encoding (str) – File encoding. Defaults to "utf8".

  • infer_schema_length (int | None) – Number of rows used to infer column types. None scans the whole file. Defaults to 100.

  • skip_rows (int) – Number of rows to skip before reading. Useful for files with metadata preambles.

class loom.etl.declarative.JsonReadOptions(infer_schema_length=100)[source]

Bases: object

Read options for newline-delimited JSON (NDJSON) file sources.

Parameters:

infer_schema_length (int | None) – Number of rows used to infer column types. None reads the whole file for inference. Defaults to 100.

class loom.etl.declarative.ExcelReadOptions(sheet_name=None, has_header=True)[source]

Bases: object

Read options for Excel (.xlsx) file sources.

Parameters:
  • sheet_name (str | None) – Sheet to read by name. None reads the first sheet (default).

  • has_header (bool) – Whether the first row is a header. Defaults to True.

class loom.etl.declarative.ParquetReadOptions[source]

Bases: object

Read options for Parquet file sources.

Parquet is self-describing — schema and types are embedded in the file metadata. Use with_schema() when you need to override or enforce a specific type mapping.

class loom.etl.declarative.CsvWriteOptions(separator=',', has_header=True, kwargs=())[source]

Bases: object

Write options for CSV / TSV file targets.

Parameters:
  • separator (str) – Column delimiter character. Defaults to ",".

  • has_header (bool) – Whether to write a header row. Defaults to True.

  • kwargs (tuple[tuple[str, Any], ...]) – Backend-specific keyword arguments forwarded verbatim. For Polars: mapped to polars.DataFrame.write_csv(). For Spark: applied via DataFrameWriter.option(k, v).

Example:

CsvWriteOptions(
    separator=";",
    kwargs=(("datetime_format", "%Y-%m-%d"), ("null_value", "N/A")),
)
class loom.etl.declarative.JsonWriteOptions(kwargs=())[source]

Bases: object

Write options for JSON (NDJSON) file targets.

Parameters:

kwargs (tuple[tuple[str, Any], ...]) – Backend-specific keyword arguments forwarded verbatim. For Polars: mapped to polars.DataFrame.write_ndjson(). For Spark: applied via DataFrameWriter.option(k, v).

Example:

JsonWriteOptions(kwargs=(("compression", "gzip"),))
class loom.etl.declarative.ParquetWriteOptions(compression='zstd', kwargs=())[source]

Bases: object

Write options for Parquet file targets.

Parameters:
  • compression (Literal['lz4', 'uncompressed', 'snappy', 'gzip', 'brotli', 'zstd']) – Parquet compression codec. Defaults to "zstd".

  • kwargs (tuple[tuple[str, Any], ...]) – Backend-specific keyword arguments forwarded verbatim. For Polars: mapped to polars.DataFrame.write_parquet(). For Spark: applied via DataFrameWriter.option(k, v).

Example:

ParquetWriteOptions(
    compression="zstd",
    kwargs=(("statistics", True), ("row_group_size", 100_000)),
)
class loom.etl.declarative.SourceKind(value)[source]

Bases: StrEnum

Physical kind of an ETL source.

class loom.etl.declarative.TableSourceSpec(alias, table_ref, predicates=<factory>, columns=<factory>, schema=<factory>, json_columns=<factory>)[source]

Bases: object

Normalized internal representation of a Delta table ETL source.

Produced by _to_spec(). Consumed by the compiler and executor — never exposed in user code.

Parameters:
  • alias (str) – Name matching the execute() parameter.

  • table_ref (TableRef) – Logical table reference.

  • predicates (tuple[Any, ...]) – Compiled predicate nodes from .where().

  • columns (tuple[str, ...]) – Column names to project at scan time. When non-empty, only these columns are read from storage — all other columns are discarded before the frame reaches execute(). The projection is pushed down to the Parquet row-group scanner, reducing I/O.

  • schema (tuple[ColumnSchema, ...]) – Optional user-declared schema applied at read time via with_columns(cast(...)); casts each declared column to its LoomDtype. Extra columns in the source pass through untouched.

  • json_columns (tuple[JsonColumnSpec, ...]) – JSON decode specs applied at read time.

property kind: SourceKind

Physical kind — always SourceKind.TABLE.

property format: Format

I/O format — always Format.DELTA for table sources.

class loom.etl.declarative.FileSourceSpec(alias, path, format, is_alias=False, read_options=None, columns=<factory>, schema=<factory>, json_columns=<factory>)[source]

Bases: object

Normalized internal representation of a file-based ETL source.

Produced by _to_spec(). Consumed by the compiler and executor — never exposed in user code.

Parameters:
  • alias (str) – Name matching the execute() parameter.

  • path (str) – Literal file path/template, or logical alias when is_alias=True.

  • format (Format) – I/O format (CSV, JSON, XLSX, Parquet).

  • is_alias (bool) – When True, path is a logical alias resolved via FileLocator at runtime. Set automatically by alias().

  • read_options (CsvReadOptions | JsonReadOptions | ExcelReadOptions | ParquetReadOptions | None) – Format-specific read options set via .with_options().

  • columns (tuple[str, ...]) – Column names to project at scan time. When non-empty, only these columns are loaded from the file.

  • schema (tuple[ColumnSchema, ...]) – Optional user-declared schema applied at read time via column-level casts.

  • json_columns (tuple[JsonColumnSpec, ...]) – JSON decode specs applied at read time.

property kind: SourceKind

Physical kind — always SourceKind.FILE.

class loom.etl.declarative.TempSourceSpec(alias, temp_name)[source]

Bases: object

Normalized internal representation of an intermediate (temp) ETL source.

Produced by _to_spec(). Consumed by the executor to retrieve data from CheckpointStore.

Parameters:
  • alias (str) – Name matching the execute() parameter.

  • temp_name (str) – Logical intermediate name matching IntoTemp.

property kind: SourceKind

Physical kind — always SourceKind.TEMP.

class loom.etl.declarative.FromTable(ref)[source]

Bases: object

Declare a Delta table as an ETL source.

Accepts a dotted logical reference or an explicit TableRef. Predicates are added via where() using the standard operator DSL.

Parameters:

ref (str | TableRef) – Logical table reference — str or TableRef.

Example:

orders = FromTable("raw.orders").where(
    (col("year")  == params.run_date.year)
    & (col("month") == params.run_date.month),
)
property table_ref: TableRef

The logical table reference.

property predicates: tuple[PredicateNode, ...]

Declared filter predicates.

with_schema(schema)[source]

Return a new FromTable with a user-declared source schema.

The schema is applied at read time via column-level casts — each declared column is cast to its declared type. Extra columns present in the table but absent from schema pass through unchanged.

Parameters:

schema (tuple[ColumnSchema, ...] | type[Any]) – Either a tuple[ColumnSchema, ...] or an annotated class (msgspec.Struct, dataclass, or plain Python class) whose fields define the column contract.

Returns:

New FromTable with the schema applied at read time.

Return type:

FromTable

Example:

orders = FromTable("raw.orders").with_schema((
    ColumnSchema("id", LoomDtype.INT64, nullable=False),
    ColumnSchema("amount", LoomDtype.FLOAT64),
))

# Equivalent with an annotated class:
class OrderRow(msgspec.Struct):
    id: int
    amount: float

orders = FromTable("raw.orders").with_schema(OrderRow)
parse_json(column, contract)[source]

Return a new FromTable that decodes column from JSON at read time.

The string column column is decoded into a structured type using the Polars str.json_decode expression (or Spark from_json).

Parameters:
Returns:

New FromTable with the JSON decode applied at read time.

Return type:

FromTable

Example:

class Payload(msgspec.Struct):
    event_type: str
    user_id: int

events = FromTable("raw.events").parse_json("payload", Payload)
events = FromTable("raw.events").parse_json("tags", list[str])
where(*predicates)[source]

Return a new FromTable with the given predicates added.

Parameters:

*predicates (Any) – One or more PredicateNode expressions built with the column / param operator DSL.

Returns:

New FromTable instance (original is unchanged).

Return type:

FromTable

columns(*cols)[source]

Return a new FromTable projecting only the listed columns.

The projection is pushed down to the Parquet row-group scanner — only the declared columns are read from storage, reducing I/O and memory pressure on wide tables.

Columns used in .where() predicates that are absent from cols are still applied at the filter level; the optimizer drops them from the output automatically.

Parameters:

*cols (str) – Column names to project. Must be non-empty.

Returns:

New FromTable with column projection applied at scan time.

Raises:

ValueError – When called with no column names.

Return type:

FromTable

Example:

orders = FromTable("raw.orders") \
    .where(col("year") == params.run_date.year) \
    .columns("id", "amount", "status")
class loom.etl.declarative.FromFile(path, *, format)[source]

Bases: object

Declare a file-based source (CSV, JSON, XLSX, Parquet).

The path supports {field_name} template placeholders that the executor resolves from the concrete params at runtime.

Parameters:
  • path (str) – File path or template, e.g. "s3://raw/orders_{run_date}.csv".

  • format (Format) – Format of the file.

Example:

report = FromFile("s3://raw/report_{run_date}.xlsx", format=Format.XLSX)
classmethod alias(name, *, format)[source]

Declare a file source by logical alias resolved from storage config.

The physical URI is looked up in storage.files at runtime via the injected FileLocator. Use this form when the path is environment-specific and should not be hard-coded in the pipeline.

Parameters:
  • name (str) – Logical file alias matching a storage.files[].name entry in the config YAML.

  • format (Format) – Format of the file.

Returns:

New FromFile backed by a logical alias.

Return type:

FromFile

Example:

events = FromFile.alias("events_raw", format=Format.CSV)
property path: str

File path template.

property format: Format

I/O format.

with_schema(schema)[source]

Return a new FromFile with a user-declared source schema.

The schema is applied at read time — each declared column is cast to its declared type. Extra columns in the file not declared in schema pass through unchanged.

Parameters:

schema (tuple[ColumnSchema, ...] | type[Any]) – Either a tuple[ColumnSchema, ...] or an annotated class (msgspec.Struct, dataclass, or plain Python class) whose fields define the column contract.

Returns:

New FromFile with the schema applied at read time.

Return type:

FromFile

Example:

class EventRow(msgspec.Struct):
    id: int
    ts: datetime.datetime

events = (
    FromFile("s3://raw/events.json", format=Format.JSON)
    .with_options(JsonReadOptions(infer_schema_length=None))
    .with_schema(EventRow)
)
parse_json(column, contract)[source]

Return a new FromFile that decodes column from JSON at read time.

The string column column is decoded into a structured type using the Polars str.json_decode expression (or Spark from_json).

Parameters:
Returns:

New FromFile with the JSON decode applied at read time.

Return type:

FromFile

with_options(options)[source]

Return a new FromFile with format-specific read options.

Parameters:

options (CsvReadOptions | JsonReadOptions | ExcelReadOptions | ParquetReadOptions) – Format-specific read options — use CsvReadOptions, JsonReadOptions, ExcelReadOptions, or ParquetReadOptions.

Returns:

New FromFile with the options applied at read time.

Return type:

FromFile

Example:

report = FromFile("s3://erp/export.csv", format=Format.CSV)
    .with_options(CsvReadOptions(separator=";", has_header=False))
columns(*cols)[source]

Return a new FromFile projecting only the listed columns.

Only the declared columns are loaded from the file — useful for wide CSVs or Parquet files where most columns are not needed.

Parameters:

*cols (str) – Column names to project. Must be non-empty.

Returns:

New FromFile with column projection applied at scan time.

Raises:

ValueError – When called with no column names.

Return type:

FromFile

Example:

report = FromFile("s3://raw/report.parquet", format=Format.PARQUET) \
    .columns("order_id", "amount", "currency")
class loom.etl.declarative.FromTemp(name)[source]

Bases: object

Declare an intermediate result as an ETL source.

The name must match the temp_name of an IntoTemp target that appears before this step in the pipeline execution order. The compiler validates this forward- reference at compile time.

The physical format is resolved automatically by CheckpointStore — Polars steps receive a lazy polars.LazyFrame (Arrow IPC), Spark steps receive a pyspark.sql.DataFrame (Parquet).

Parameters:

name (str) – Logical name of the intermediate to consume — must match the corresponding IntoTemp.

Example:

normalized = FromTemp("normalized_orders")
property temp_name: str

Logical name of the intermediate to consume.

class loom.etl.declarative.Sources(**sources)[source]

Bases: object

Group multiple source declarations under a single sources attribute.

Keyword arguments become the source aliases used in execute() parameter names.

Parameters:

**sources (_SourceEntryType) – Mapping of alias → source declaration.

Example:

sources = Sources(
    orders=FromTable("raw.orders").where(...),
    customers=FromTable("raw.customers"),
)
property aliases: tuple[str, ...]

Declared source aliases in insertion order.

class loom.etl.declarative.SourceSet[source]

Bases: Generic[ParamsT]

Reusable named group of sources that can be extended per step.

Declare shared reads once and compose them into individual steps via extended().

Parameters:

**sources – Base source declarations.

Example:

class OrderSources(SourceSet[DailyOrdersParams]):
    orders = FromTable("raw.orders").where(...)
    customers = FromTable("raw.customers")

class MyStep(ETLStep[DailyOrdersParams]):
    sources = OrderSources.extended(
        brands=FromTable("erp.brands"),
    )
classmethod extended(**extra)[source]

Return a new SourceSet with additional sources merged in.

Parameters:

**extra (FromTable | FromFile | FromTemp) – Additional source declarations to add.

Returns:

New SourceSet with the combined sources.

Raises:

ValueError – If any key in extra conflicts with an existing alias.

Return type:

SourceSet[ParamsT]

class loom.etl.declarative.IntoTable(ref)[source]

Bases: object

Declare a Delta table as the ETL step target.

The write mode is set by chaining one of: append(), replace(), replace_partitions(), replace_where(), upsert().

Parameters:

ref (str | TableRef) – Logical table reference — str or TableRef.

Example:

# Full replace
target = IntoTable("staging.orders").replace()

# Replace only the partitions present in the batch
target = IntoTable("staging.orders").replace_partitions("year", "month")

# Replace a date-range for backfill
target = IntoTable("staging.orders").replace_where(
    col("date").between(params.start_date, params.end_date)
)
append(*, schema=SchemaMode.STRICT)[source]

Write mode: append rows to the target table.

Parameters:

schema (SchemaMode) – Schema evolution strategy. Defaults to STRICT.

Returns:

New IntoTable with APPEND mode.

Return type:

IntoTable

replace(*, schema=SchemaMode.STRICT)[source]

Write mode: full replace of the target table.

Overwrites all data in the table. Use replace_partitions() for partition-scoped overwrite or replace_where() for predicate-scoped.

Parameters:

schema (SchemaMode) – Schema evolution strategy. Use OVERWRITE to replace the table schema entirely alongside the data.

Returns:

New IntoTable with REPLACE mode.

Return type:

IntoTable

replace_partitions(*cols, schema=SchemaMode.STRICT)[source]

Write mode: replace the partitions present in the batch frame.

Collects the distinct partition values from the frame at write time and builds the replace predicate dynamically — no params required.

For replacing a specific partition whose values come from run params, use replace_partition().

Parameters:
  • *cols (str) – Partition column names to collect from the frame.

  • schema (SchemaMode) – Schema evolution strategy.

Returns:

New IntoTable with REPLACE_PARTITIONS mode.

Raises:

ValueError – If no column names are provided.

Return type:

IntoTable

Example:

target = IntoTable("staging.orders").replace_partitions("year", "month")
replace_partition(*, schema=SchemaMode.STRICT, **partition)[source]

Write mode: replace a specific partition identified by exact column values from params.

Use when the partition to replace is known at pipeline design time and its values come from run params. The writer resolves each value against the concrete params at runtime and issues a Delta replaceWhere on the resulting equality predicate — no collect required.

For dynamic replacement (values inferred from the batch), use replace_partitions(). For arbitrary predicates, use replace_where().

Parameters:
  • schema (SchemaMode) – Schema evolution strategy.

  • **partition (ParamExpr) – Partition column name → ParamExpr pairs (e.g. year=params.run_date.year).

Returns:

New IntoTable with REPLACE_WHERE mode.

Raises:

ValueError – If no column=value pairs are provided.

Return type:

IntoTable

Example:

target = IntoTable("staging.orders").replace_partition(
    year=params.run_date.year,
    month=params.run_date.month,
)
replace_where(predicate, *, schema=SchemaMode.STRICT)[source]

Write mode: replace rows matching an explicit predicate.

The predicate is resolved against the run params and passed to Delta as replaceWhere. Only the matching data is overwritten — Delta uses partition pruning so only affected files are rewritten.

Typical use case: backfill / reprocessing a date range.

Parameters:
  • predicate (PredicateNode) – Predicate built with the col() / params DSL.

  • schema (SchemaMode) – Schema evolution strategy.

Returns:

New IntoTable with REPLACE_WHERE mode.

Return type:

IntoTable

Example:

target = IntoTable("staging.orders").replace_where(
    col("date").between(params.start_date, params.end_date)
)
upsert(*, keys, partition_cols=(), exclude=(), include=(), schema=SchemaMode.STRICT)[source]

Write mode: merge rows using the given key columns (UPSERT / MERGE).

On first write the table is created from the frame. Subsequent writes issue a Delta MERGE: matched rows are updated, unmatched rows are inserted.

Declaring partition_cols is strongly recommended for large tables — it allows Delta to prune files at the log level before evaluating the join condition. Without it, every MERGE forces a full table scan.

Parameters:
  • keys (tuple[str, ...]) – Columns that uniquely identify a row. Used in the MERGE ON join condition.

  • partition_cols (tuple[str, ...]) – Partition columns to include in the MERGE ON predicate and for Delta log pruning. Must be a subset of the frame columns.

  • exclude (tuple[str, ...]) – Columns to exclude from UPDATE SET on match. Keys and partition columns are always excluded. Mutually exclusive with include.

  • include (tuple[str, ...]) – Explicit allow-list of columns to update on match. Keys and partition columns are always excluded even if listed here. Mutually exclusive with exclude.

  • schema (SchemaMode) – Schema evolution strategy. Defaults to STRICT.

Returns:

New IntoTable with UPSERT mode.

Return type:

IntoTable

Example:

target = IntoTable("events.orders").upsert(
    keys=("order_id",),
    partition_cols=("year", "month"),
    exclude=("created_at",),
)
class loom.etl.declarative.IntoFile(path, *, format)[source]

Bases: object

Declare a file as the ETL step target (CSV, JSON, XLSX, Parquet).

The path supports {field_name} template placeholders resolved from params at runtime.

Parameters:
  • path (str) – File path or template, e.g. "s3://exports/orders_{run_date}.csv".

  • format (Format) – Format of the output file.

Example:

target = IntoFile("s3://exports/summary_{run_date}.xlsx", format=Format.XLSX)
classmethod alias(name, *, format)[source]

Declare a file target by logical alias resolved from storage config.

The physical URI is looked up in storage.files at runtime via the injected FileLocator. Use this form when the path is environment-specific and should not be hard-coded in the pipeline.

Parameters:
  • name (str) – Logical file alias matching a storage.files[].name entry in the config YAML.

  • format (Format) – Format of the output file.

Returns:

New IntoFile backed by a logical alias.

Return type:

IntoFile

Example:

target = IntoFile.alias("exports_daily", format=Format.PARQUET)
with_options(options)[source]

Return a new IntoFile with format-specific write options.

Parameters:

options (CsvWriteOptions | ParquetWriteOptions | JsonWriteOptions) – Format-specific write options — use CsvWriteOptions or ParquetWriteOptions.

Returns:

New IntoFile with the options applied at write time.

Return type:

IntoFile

Example:

target = IntoFile("s3://exports/report.csv", format=Format.CSV)
    .with_options(CsvWriteOptions(separator=";"))
class loom.etl.declarative.IntoTemp(name, *, scope=CheckpointScope.RUN, append=False)[source]

Bases: object

Declare an intermediate result that bypasses Delta and lives in tmp storage.

The physical format is chosen automatically by CheckpointStore based on the DataFrame type returned by execute():

  • Polars — Arrow IPC via sink_ipc() (streaming write, no collect) and scan_ipc() (lazy, memory-mapped read with predicate pushdown).

  • Spark — Parquet directory via df.write.parquet() and spark.read.parquet(). Cuts the lineage DAG; Photon-optimised.

Use FromTemp in a downstream step to consume the result.

Parameters:
  • name (str) – Logical name identifying this intermediate. By default the name must be unique across the pipeline — the compiler raises if two steps write to the same name.

  • scope (CheckpointScope) – Lifetime scope. Defaults to RUN.

  • append (bool) – When True, multiple steps may write to this name; their outputs are concatenated and exposed as one logical intermediate (fan-in pattern). All writers for a given name must agree on the same append value — mixing True and False is a compile-time error.

Example:

# strict — only one step may write "normalized_orders"
target = IntoTemp("normalized_orders")

# fan-in — multiple partition steps write to the same intermediate
target = IntoTemp("order_parts", append=True)
property temp_name: str

Logical name of this intermediate.

property scope: CheckpointScope

Lifetime scope.

property append: bool

Whether multiple steps may write to this name (fan-in).

class loom.etl.declarative.IntoHistory(ref, *, keys, effective_date, mode='snapshot', track=None, overwrite=None, delete_policy='close', partition_scope=None, valid_from='valid_from', valid_to='valid_to', deleted_at='deleted_at', date_type='date', schema=SchemaMode.STRICT, allow_temporal_rerun=False)[source]

Bases: object

Declare a Delta table as the ETL step SCD Type 2 history target.

Each run compares incoming data against the current open vectors in the target table. Only changed entity states generate new rows — unchanged entities produce no writes.

Column roles:

  • keys — entity identity. Never change; they are the MERGE join key.

  • track — change-triggering columns. A value change inserts a new row and closes the previous open vector. None means every non-key column is tracked.

  • overwrite — columns updated in-place on the open row when the entity is UNCHANGED. No new history row is created; the current open row is silently refreshed. Useful for mutable metadata that should not drive history.

  • Remaining — “passive” columns: carried forward into each new row but never updated nor tracked.

Multiple simultaneous open vectors are natively supported. Include the distinguishing dimension in track. For example, a player on loan to two clubs simultaneously uses track=("team_id", "role") — the vectors (player_id=P1, team_id=RM, role=OWNER) and (player_id=P1, team_id=GET, role=LOAN) are independent and coexist without conflict.

Parameters:
  • ref (str | TableRef) – Logical table reference — str or TableRef.

  • keys (tuple[str, ...] | list[str]) – One or more column names that identify the entity. Must be non-empty and must not overlap with track.

  • effective_date (str | ParamExpr) – In "log" mode: name of the frame column carrying the event date/timestamp. In "snapshot" mode: a ParamExpr or column name resolved from run params.

  • mode (Literal['snapshot', 'log']) – "snapshot" (default) or "log".

  • track (tuple[str, ...] | list[str] | None) – Columns whose changes trigger a new history row. None means all non-key columns are tracked.

  • overwrite (tuple[str, ...] | list[str] | None) – Columns to update in-place on the open row when unchanged. Must not overlap with keys or track.

  • delete_policy (Literal['ignore', 'close', 'soft_delete']) – Action for absent keys in SNAPSHOT mode. Defaults to "close".

  • partition_scope (tuple[str, ...] | list[str] | None) – Partition columns to constrain Delta reads/writes. Strongly recommended for large tables.

  • valid_from (str) – Name of the period-start column in the Delta table. Defaults to "valid_from".

  • valid_to (str) – Name of the period-end column in the Delta table. Defaults to "valid_to".

  • date_type (Literal['date', 'timestamp']) – Precision for boundary columns. Defaults to "date".

  • schema (SchemaMode) – Schema evolution strategy. Defaults to STRICT.

  • allow_temporal_rerun (bool) – Allow re-weave when past-date corrections are loaded. Defaults to False.

  • deleted_at (str)

Raises:
  • ValueError – If keys is empty.

  • ValueError – If track overlaps with keys.

  • ValueError – If overwrite overlaps with keys, track, or boundary columns.

  • ValueError – If valid_from and valid_to share the same name.

Example:

target = IntoHistory(
    "warehouse.dim_players",
    keys=("player_id",),
    track=("team_id", "contract_value"),
    effective_date=params.run_date,
    mode="snapshot",
    delete_policy="close",
    partition_scope=("season",),
)
class loom.etl.declarative.SchemaMode(value)[source]

Bases: StrEnum

Schema evolution strategy applied by the target writer before each write.

Values:

  • STRICT — fail on incompatible schema changes.

  • EVOLVE — allow additive evolution where supported.

  • OVERWRITE — replace table schema with incoming schema.

class loom.etl.declarative.HistorifyInputMode(value)[source]

Bases: StrEnum

Input semantics for the SCD2 writer.

Values:

  • SNAPSHOT — the frame is a full snapshot of the entity dimension. Keys absent from the snapshot are handled according to delete_policy.

  • LOG — the frame carries individual change events; each row has an effective_date column that determines when the change took effect.

class loom.etl.declarative.DeletePolicy(value)[source]

Bases: StrEnum

Action applied to entity keys absent from an incoming snapshot.

Only meaningful when mode=SNAPSHOT; LOG mode ignores this setting.

Values:

  • IGNORE — leave open vectors open. Use for partial or incremental snapshots where absence does not imply deletion.

  • CLOSE — close the open vector by setting valid_to = effective_date - 1. Standard SCD2 behavior for full-dimension snapshots.

  • SOFT_DELETE — close the open vector and stamp a deleted_at column. Use when downstream systems require explicit deletion audit trails.

class loom.etl.declarative.HistoryDateType(value)[source]

Bases: StrEnum

Column type used for valid_from / valid_to boundary columns.

Values:

  • DATE — calendar date precision. Suitable for daily SCD2 pipelines.

  • TIMESTAMP — microsecond precision. Use for event-driven or sub-daily update patterns.

class loom.etl.declarative.HistorifySpec(table_ref, keys, effective_date, mode, track=None, overwrite=None, delete_policy=DeletePolicy.CLOSE, partition_scope=None, valid_from='valid_from', valid_to='valid_to', deleted_at='deleted_at', date_type=HistoryDateType.DATE, schema_mode=SchemaMode.STRICT, allow_temporal_rerun=False)[source]

Bases: object

Compiled SCD Type 2 write spec. Produced by IntoHistory._to_spec().

This frozen dataclass is the sole input to the backend engine. All configuration is resolved at declaration time — no runtime reflection occurs.

Both table_ref and schema_mode are required for duck-type detection by the write-policy dispatcher (_is_table_target_spec).

Parameters:
  • table_ref (TableRef) – Logical Delta table reference.

  • keys (tuple[str, ...]) – Columns that uniquely identify the entity (e.g. ("player_id",)).

  • effective_date (str | ParamExpr) – Column name in LOG mode, or a ParamExpr in SNAPSHOT mode.

  • mode (HistorifyInputMode) – Input semantics — SNAPSHOT or LOG.

  • track (tuple[str, ...] | None) – Columns whose value change triggers a new history row. None means every non-key column is tracked.

  • overwrite (tuple[str, ...] | None) – Columns updated in-place on the current open row when the entity is UNCHANGED (no new history row created). Ignored in LOG mode. Must not overlap with keys or track.

  • delete_policy (DeletePolicy) – Action for absent keys in SNAPSHOT mode. Ignored in LOG mode.

  • partition_scope (tuple[str, ...] | None) – Partition columns used to limit Delta reads/writes. Strongly recommended for large tables.

  • valid_from (str) – Name of the period-start boundary column.

  • valid_to (str) – Name of the period-end boundary column. NULL in the table means the vector is currently open.

  • deleted_at (str) – Name of the soft-delete audit column. Only written when delete_policy=SOFT_DELETE.

  • date_type (HistoryDateType) – Precision of the valid_from / valid_to columns.

  • schema_mode (SchemaMode) – Schema evolution strategy.

  • allow_temporal_rerun (bool) – Allow re-weave when past-date data is loaded.

class loom.etl.declarative.HistorifyRepairReport(affected_keys, dates_requiring_rerun, warnings)[source]

Bases: object

Structured report produced by a successful re-weave operation.

Returned by the backend engine when allow_temporal_rerun=True and past-date data was corrected. Consumers may use this to schedule downstream re-runs for the affected date range.

Parameters:
affected_keys

Frozenset of entity key tuples that were modified.

Type:

frozenset[tuple[object, …]]

dates_requiring_rerun

Sorted tuple of dates where history was repaired.

Type:

tuple[object, …]

warnings

Human-readable description of each repair action.

Type:

tuple[str, …]

exception loom.etl.declarative.HistorifyKeyConflictError(duplicates)[source]

Bases: ValueError

Raised when the incoming snapshot contains duplicate keys + track combinations.

SCD2 semantics require that each entity state vector is unique within a snapshot. Duplicates indicate a data-modelling error: the distinguishing dimension must be included in keys or track.

If the same entity legitimately belongs to multiple simultaneous states (e.g. a player on loan to two clubs at once), include the distinguishing column in track — both vectors will then be tracked independently.

Parameters:

duplicates (str) – Human-readable representation of the conflicting key tuples.

Return type:

None

exception loom.etl.declarative.HistorifyDateCollisionError(collisions)[source]

Bases: ValueError

Raised in LOG mode when two events share the same (keys + track, effective_date).

At DATE granularity, two events on the same calendar day for the same entity state vector are ambiguous — relative ordering is lost and idempotency cannot be guaranteed.

To resolve: switch to date_type="timestamp" for sub-daily precision, or deduplicate events upstream before loading.

Parameters:

collisions (str) – Human-readable description of the colliding key/date pairs.

Return type:

None

exception loom.etl.declarative.HistorifyTemporalConflictError(min_conflict_date, effective_date)[source]

Bases: ValueError

Raised when the target contains future-open vectors and re-weave is disabled.

A temporal conflict occurs when valid_from > effective_date, meaning the incoming data must be inserted before an already-committed record. This requires explicit opt-in via allow_temporal_rerun=True.

Parameters:
  • min_conflict_date (object) – Earliest valid_from date found in the conflict.

  • effective_date (object) – The effective_date of the current run.

Return type:

None