loom.etl.declarative¶
I/O declaration API for ETL sources, targets, and file formats.
- class loom.etl.declarative.Format(value)[source]¶
Bases:
StrEnumSupported I/O formats for ETL sources and targets.
Used by
FromFileandIntoFileto declare the physical format of a file-based source or target.DELTAis the implicit format forFromTableandIntoTable— it does not need to be set explicitly there.Example:
target = IntoFile("s3://exports/report_{run_date}.csv", format=Format.CSV)
- class loom.etl.declarative.CsvReadOptions(separator=', ', has_header=True, null_values=<factory>, encoding='utf8', infer_schema_length=100, skip_rows=0)[source]¶
Bases:
objectRead options for CSV / TSV file sources.
All fields default to the most common CSV convention.
- Parameters:
separator (str) – Column delimiter character. Defaults to
",".has_header (bool) – Whether the first row is a header. Defaults to
True.null_values (tuple[str, ...]) – Strings that should be interpreted as
null.encoding (str) – File encoding. Defaults to
"utf8".infer_schema_length (int | None) – Number of rows used to infer column types.
Nonescans the whole file. Defaults to100.skip_rows (int) – Number of rows to skip before reading. Useful for files with metadata preambles.
- class loom.etl.declarative.JsonReadOptions(infer_schema_length=100)[source]¶
Bases:
objectRead options for newline-delimited JSON (NDJSON) file sources.
- Parameters:
infer_schema_length (int | None) – Number of rows used to infer column types.
Nonereads the whole file for inference. Defaults to100.
- class loom.etl.declarative.ExcelReadOptions(sheet_name=None, has_header=True)[source]¶
Bases:
objectRead options for Excel (
.xlsx) file sources.
- class loom.etl.declarative.ParquetReadOptions[source]¶
Bases:
objectRead options for Parquet file sources.
Parquet is self-describing — schema and types are embedded in the file metadata. Use
with_schema()when you need to override or enforce a specific type mapping.
- class loom.etl.declarative.CsvWriteOptions(separator=',', has_header=True, kwargs=())[source]¶
Bases:
objectWrite options for CSV / TSV file targets.
- Parameters:
separator (str) – Column delimiter character. Defaults to
",".has_header (bool) – Whether to write a header row. Defaults to
True.kwargs (tuple[tuple[str, Any], ...]) – Backend-specific keyword arguments forwarded verbatim. For Polars: mapped to
polars.DataFrame.write_csv(). For Spark: applied viaDataFrameWriter.option(k, v).
Example:
CsvWriteOptions( separator=";", kwargs=(("datetime_format", "%Y-%m-%d"), ("null_value", "N/A")), )
- class loom.etl.declarative.JsonWriteOptions(kwargs=())[source]¶
Bases:
objectWrite options for JSON (NDJSON) file targets.
- Parameters:
kwargs (tuple[tuple[str, Any], ...]) – Backend-specific keyword arguments forwarded verbatim. For Polars: mapped to
polars.DataFrame.write_ndjson(). For Spark: applied viaDataFrameWriter.option(k, v).
Example:
JsonWriteOptions(kwargs=(("compression", "gzip"),))
- class loom.etl.declarative.ParquetWriteOptions(compression='zstd', kwargs=())[source]¶
Bases:
objectWrite options for Parquet file targets.
- Parameters:
compression (Literal['lz4', 'uncompressed', 'snappy', 'gzip', 'brotli', 'zstd']) – Parquet compression codec. Defaults to
"zstd".kwargs (tuple[tuple[str, Any], ...]) – Backend-specific keyword arguments forwarded verbatim. For Polars: mapped to
polars.DataFrame.write_parquet(). For Spark: applied viaDataFrameWriter.option(k, v).
Example:
ParquetWriteOptions( compression="zstd", kwargs=(("statistics", True), ("row_group_size", 100_000)), )
- class loom.etl.declarative.SourceKind(value)[source]¶
Bases:
StrEnumPhysical kind of an ETL source.
- class loom.etl.declarative.TableSourceSpec(alias, table_ref, predicates=<factory>, columns=<factory>, schema=<factory>, json_columns=<factory>)[source]¶
Bases:
objectNormalized internal representation of a Delta table ETL source.
Produced by
_to_spec(). Consumed by the compiler and executor — never exposed in user code.- Parameters:
alias (str) – Name matching the
execute()parameter.table_ref (TableRef) – Logical table reference.
predicates (tuple[Any, ...]) – Compiled predicate nodes from
.where().columns (tuple[str, ...]) – Column names to project at scan time. When non-empty, only these columns are read from storage — all other columns are discarded before the frame reaches
execute(). The projection is pushed down to the Parquet row-group scanner, reducing I/O.schema (tuple[ColumnSchema, ...]) – Optional user-declared schema applied at read time via
with_columns(cast(...)); casts each declared column to itsLoomDtype. Extra columns in the source pass through untouched.json_columns (tuple[JsonColumnSpec, ...]) – JSON decode specs applied at read time.
- property kind: SourceKind¶
Physical kind — always
SourceKind.TABLE.
- class loom.etl.declarative.FileSourceSpec(alias, path, format, is_alias=False, read_options=None, columns=<factory>, schema=<factory>, json_columns=<factory>)[source]¶
Bases:
objectNormalized internal representation of a file-based ETL source.
Produced by
_to_spec(). Consumed by the compiler and executor — never exposed in user code.- Parameters:
alias (str) – Name matching the
execute()parameter.path (str) – Literal file path/template, or logical alias when
is_alias=True.format (Format) – I/O format (CSV, JSON, XLSX, Parquet).
is_alias (bool) – When
True, path is a logical alias resolved viaFileLocatorat runtime. Set automatically byalias().read_options (CsvReadOptions | JsonReadOptions | ExcelReadOptions | ParquetReadOptions | None) – Format-specific read options set via
.with_options().columns (tuple[str, ...]) – Column names to project at scan time. When non-empty, only these columns are loaded from the file.
schema (tuple[ColumnSchema, ...]) – Optional user-declared schema applied at read time via column-level casts.
json_columns (tuple[JsonColumnSpec, ...]) – JSON decode specs applied at read time.
- property kind: SourceKind¶
Physical kind — always
SourceKind.FILE.
- class loom.etl.declarative.TempSourceSpec(alias, temp_name)[source]¶
Bases:
objectNormalized internal representation of an intermediate (temp) ETL source.
Produced by
_to_spec(). Consumed by the executor to retrieve data fromCheckpointStore.- Parameters:
- property kind: SourceKind¶
Physical kind — always
SourceKind.TEMP.
- class loom.etl.declarative.FromTable(ref)[source]¶
Bases:
objectDeclare a Delta table as an ETL source.
Accepts a dotted logical reference or an explicit
TableRef. Predicates are added viawhere()using the standard operator DSL.Example:
orders = FromTable("raw.orders").where( (col("year") == params.run_date.year) & (col("month") == params.run_date.month), )
- with_schema(schema)[source]¶
Return a new
FromTablewith a user-declared source schema.The schema is applied at read time via column-level casts — each declared column is cast to its declared type. Extra columns present in the table but absent from schema pass through unchanged.
- Parameters:
schema (tuple[ColumnSchema, ...] | type[Any]) – Either a
tuple[ColumnSchema, ...]or an annotated class (msgspec.Struct,dataclass, or plain Python class) whose fields define the column contract.- Returns:
New
FromTablewith the schema applied at read time.- Return type:
Example:
orders = FromTable("raw.orders").with_schema(( ColumnSchema("id", LoomDtype.INT64, nullable=False), ColumnSchema("amount", LoomDtype.FLOAT64), )) # Equivalent with an annotated class: class OrderRow(msgspec.Struct): id: int amount: float orders = FromTable("raw.orders").with_schema(OrderRow)
- parse_json(column, contract)[source]¶
Return a new
FromTablethat decodes column from JSON at read time.The string column column is decoded into a structured type using the Polars
str.json_decodeexpression (or Sparkfrom_json).- Parameters:
column (str) – Name of the string column that contains the JSON payload.
contract (LoomDtype | ListType | ArrayType | StructType | DecimalType | DatetimeType | DurationType | CategoricalType | EnumType | type[Any]) –
Target type for the decoded column. Accepted forms:
Any
LoomTypeinstance (e.g.StructType(...),ListType(...)).An annotated class (
msgspec.Struct,dataclass, or plain Python class) — converted toStructType.list[SomeClass]— converted toListType.
- Returns:
New
FromTablewith the JSON decode applied at read time.- Return type:
Example:
class Payload(msgspec.Struct): event_type: str user_id: int events = FromTable("raw.events").parse_json("payload", Payload) events = FromTable("raw.events").parse_json("tags", list[str])
- columns(*cols)[source]¶
Return a new
FromTableprojecting only the listed columns.The projection is pushed down to the Parquet row-group scanner — only the declared columns are read from storage, reducing I/O and memory pressure on wide tables.
Columns used in
.where()predicates that are absent from cols are still applied at the filter level; the optimizer drops them from the output automatically.- Parameters:
*cols (str) – Column names to project. Must be non-empty.
- Returns:
New
FromTablewith column projection applied at scan time.- Raises:
ValueError – When called with no column names.
- Return type:
Example:
orders = FromTable("raw.orders") \ .where(col("year") == params.run_date.year) \ .columns("id", "amount", "status")
- class loom.etl.declarative.FromFile(path, *, format)[source]¶
Bases:
objectDeclare a file-based source (CSV, JSON, XLSX, Parquet).
The
pathsupports{field_name}template placeholders that the executor resolves from the concrete params at runtime.- Parameters:
Example:
report = FromFile("s3://raw/report_{run_date}.xlsx", format=Format.XLSX)
- classmethod alias(name, *, format)[source]¶
Declare a file source by logical alias resolved from storage config.
The physical URI is looked up in
storage.filesat runtime via the injectedFileLocator. Use this form when the path is environment-specific and should not be hard-coded in the pipeline.- Parameters:
- Returns:
New
FromFilebacked by a logical alias.- Return type:
Example:
events = FromFile.alias("events_raw", format=Format.CSV)
- with_schema(schema)[source]¶
Return a new
FromFilewith a user-declared source schema.The schema is applied at read time — each declared column is cast to its declared type. Extra columns in the file not declared in schema pass through unchanged.
- Parameters:
schema (tuple[ColumnSchema, ...] | type[Any]) – Either a
tuple[ColumnSchema, ...]or an annotated class (msgspec.Struct,dataclass, or plain Python class) whose fields define the column contract.- Returns:
New
FromFilewith the schema applied at read time.- Return type:
Example:
class EventRow(msgspec.Struct): id: int ts: datetime.datetime events = ( FromFile("s3://raw/events.json", format=Format.JSON) .with_options(JsonReadOptions(infer_schema_length=None)) .with_schema(EventRow) )
- parse_json(column, contract)[source]¶
Return a new
FromFilethat decodes column from JSON at read time.The string column column is decoded into a structured type using the Polars
str.json_decodeexpression (or Sparkfrom_json).- Parameters:
column (str) – Name of the string column that contains the JSON payload.
contract (LoomDtype | ListType | ArrayType | StructType | DecimalType | DatetimeType | DurationType | CategoricalType | EnumType | type[Any]) –
Target type for the decoded column. Accepted forms:
Any
LoomTypeinstance.An annotated class — converted to
StructType.list[SomeClass]— converted toListType.
- Returns:
New
FromFilewith the JSON decode applied at read time.- Return type:
- with_options(options)[source]¶
Return a new
FromFilewith format-specific read options.- Parameters:
options (CsvReadOptions | JsonReadOptions | ExcelReadOptions | ParquetReadOptions) – Format-specific read options — use
CsvReadOptions,JsonReadOptions,ExcelReadOptions, orParquetReadOptions.- Returns:
New
FromFilewith the options applied at read time.- Return type:
Example:
report = FromFile("s3://erp/export.csv", format=Format.CSV) .with_options(CsvReadOptions(separator=";", has_header=False))
- columns(*cols)[source]¶
Return a new
FromFileprojecting only the listed columns.Only the declared columns are loaded from the file — useful for wide CSVs or Parquet files where most columns are not needed.
- Parameters:
*cols (str) – Column names to project. Must be non-empty.
- Returns:
New
FromFilewith column projection applied at scan time.- Raises:
ValueError – When called with no column names.
- Return type:
Example:
report = FromFile("s3://raw/report.parquet", format=Format.PARQUET) \ .columns("order_id", "amount", "currency")
- class loom.etl.declarative.FromTemp(name)[source]¶
Bases:
objectDeclare an intermediate result as an ETL source.
The name must match the
temp_nameof anIntoTemptarget that appears before this step in the pipeline execution order. The compiler validates this forward- reference at compile time.The physical format is resolved automatically by
CheckpointStore— Polars steps receive a lazypolars.LazyFrame(Arrow IPC), Spark steps receive apyspark.sql.DataFrame(Parquet).- Parameters:
name (str) – Logical name of the intermediate to consume — must match the corresponding
IntoTemp.
Example:
normalized = FromTemp("normalized_orders")
- class loom.etl.declarative.Sources(**sources)[source]¶
Bases:
objectGroup multiple source declarations under a single
sourcesattribute.Keyword arguments become the source aliases used in
execute()parameter names.- Parameters:
**sources (_SourceEntryType) – Mapping of alias → source declaration.
Example:
sources = Sources( orders=FromTable("raw.orders").where(...), customers=FromTable("raw.customers"), )
- class loom.etl.declarative.SourceSet[source]¶
Bases:
Generic[ParamsT]Reusable named group of sources that can be extended per step.
Declare shared reads once and compose them into individual steps via
extended().- Parameters:
**sources – Base source declarations.
Example:
class OrderSources(SourceSet[DailyOrdersParams]): orders = FromTable("raw.orders").where(...) customers = FromTable("raw.customers") class MyStep(ETLStep[DailyOrdersParams]): sources = OrderSources.extended( brands=FromTable("erp.brands"), )
- classmethod extended(**extra)[source]¶
Return a new
SourceSetwith additional sources merged in.- Parameters:
**extra (FromTable | FromFile | FromTemp) – Additional source declarations to add.
- Returns:
New
SourceSetwith the combined sources.- Raises:
ValueError – If any key in
extraconflicts with an existing alias.- Return type:
SourceSet[ParamsT]
- class loom.etl.declarative.IntoTable(ref)[source]¶
Bases:
objectDeclare a Delta table as the ETL step target.
The write mode is set by chaining one of:
append(),replace(),replace_partitions(),replace_where(),upsert().Example:
# Full replace target = IntoTable("staging.orders").replace() # Replace only the partitions present in the batch target = IntoTable("staging.orders").replace_partitions("year", "month") # Replace a date-range for backfill target = IntoTable("staging.orders").replace_where( col("date").between(params.start_date, params.end_date) )
- append(*, schema=SchemaMode.STRICT)[source]¶
Write mode: append rows to the target table.
- Parameters:
schema (SchemaMode) – Schema evolution strategy. Defaults to
STRICT.- Returns:
New
IntoTablewith APPEND mode.- Return type:
- replace(*, schema=SchemaMode.STRICT)[source]¶
Write mode: full replace of the target table.
Overwrites all data in the table. Use
replace_partitions()for partition-scoped overwrite orreplace_where()for predicate-scoped.- Parameters:
schema (SchemaMode) – Schema evolution strategy. Use
OVERWRITEto replace the table schema entirely alongside the data.- Returns:
New
IntoTablewith REPLACE mode.- Return type:
- replace_partitions(*cols, schema=SchemaMode.STRICT)[source]¶
Write mode: replace the partitions present in the batch frame.
Collects the distinct partition values from the frame at write time and builds the replace predicate dynamically — no params required.
For replacing a specific partition whose values come from run params, use
replace_partition().- Parameters:
*cols (str) – Partition column names to collect from the frame.
schema (SchemaMode) – Schema evolution strategy.
- Returns:
New
IntoTablewith REPLACE_PARTITIONS mode.- Raises:
ValueError – If no column names are provided.
- Return type:
Example:
target = IntoTable("staging.orders").replace_partitions("year", "month")
- replace_partition(*, schema=SchemaMode.STRICT, **partition)[source]¶
Write mode: replace a specific partition identified by exact column values from params.
Use when the partition to replace is known at pipeline design time and its values come from run params. The writer resolves each value against the concrete params at runtime and issues a Delta
replaceWhereon the resulting equality predicate — no collect required.For dynamic replacement (values inferred from the batch), use
replace_partitions(). For arbitrary predicates, usereplace_where().- Parameters:
schema (SchemaMode) – Schema evolution strategy.
**partition (ParamExpr) – Partition column name →
ParamExprpairs (e.g.year=params.run_date.year).
- Returns:
New
IntoTablewith REPLACE_WHERE mode.- Raises:
ValueError – If no column=value pairs are provided.
- Return type:
Example:
target = IntoTable("staging.orders").replace_partition( year=params.run_date.year, month=params.run_date.month, )
- replace_where(predicate, *, schema=SchemaMode.STRICT)[source]¶
Write mode: replace rows matching an explicit predicate.
The predicate is resolved against the run params and passed to Delta as
replaceWhere. Only the matching data is overwritten — Delta uses partition pruning so only affected files are rewritten.Typical use case: backfill / reprocessing a date range.
- Parameters:
predicate (PredicateNode) – Predicate built with the
col()/paramsDSL.schema (SchemaMode) – Schema evolution strategy.
- Returns:
New
IntoTablewith REPLACE_WHERE mode.- Return type:
Example:
target = IntoTable("staging.orders").replace_where( col("date").between(params.start_date, params.end_date) )
- upsert(*, keys, partition_cols=(), exclude=(), include=(), schema=SchemaMode.STRICT)[source]¶
Write mode: merge rows using the given key columns (UPSERT / MERGE).
On first write the table is created from the frame. Subsequent writes issue a Delta MERGE: matched rows are updated, unmatched rows are inserted.
Declaring
partition_colsis strongly recommended for large tables — it allows Delta to prune files at the log level before evaluating the join condition. Without it, every MERGE forces a full table scan.- Parameters:
keys (tuple[str, ...]) – Columns that uniquely identify a row. Used in the MERGE
ONjoin condition.partition_cols (tuple[str, ...]) – Partition columns to include in the MERGE
ONpredicate and for Delta log pruning. Must be a subset of the frame columns.exclude (tuple[str, ...]) – Columns to exclude from
UPDATE SETon match. Keys and partition columns are always excluded. Mutually exclusive with include.include (tuple[str, ...]) – Explicit allow-list of columns to update on match. Keys and partition columns are always excluded even if listed here. Mutually exclusive with exclude.
schema (SchemaMode) – Schema evolution strategy. Defaults to
STRICT.
- Returns:
New
IntoTablewith UPSERT mode.- Return type:
Example:
target = IntoTable("events.orders").upsert( keys=("order_id",), partition_cols=("year", "month"), exclude=("created_at",), )
- class loom.etl.declarative.IntoFile(path, *, format)[source]¶
Bases:
objectDeclare a file as the ETL step target (CSV, JSON, XLSX, Parquet).
The
pathsupports{field_name}template placeholders resolved from params at runtime.- Parameters:
Example:
target = IntoFile("s3://exports/summary_{run_date}.xlsx", format=Format.XLSX)
- classmethod alias(name, *, format)[source]¶
Declare a file target by logical alias resolved from storage config.
The physical URI is looked up in
storage.filesat runtime via the injectedFileLocator. Use this form when the path is environment-specific and should not be hard-coded in the pipeline.- Parameters:
- Returns:
New
IntoFilebacked by a logical alias.- Return type:
Example:
target = IntoFile.alias("exports_daily", format=Format.PARQUET)
- with_options(options)[source]¶
Return a new
IntoFilewith format-specific write options.- Parameters:
options (CsvWriteOptions | ParquetWriteOptions | JsonWriteOptions) – Format-specific write options — use
CsvWriteOptionsorParquetWriteOptions.- Returns:
New
IntoFilewith the options applied at write time.- Return type:
Example:
target = IntoFile("s3://exports/report.csv", format=Format.CSV) .with_options(CsvWriteOptions(separator=";"))
- class loom.etl.declarative.IntoTemp(name, *, scope=CheckpointScope.RUN, append=False)[source]¶
Bases:
objectDeclare an intermediate result that bypasses Delta and lives in tmp storage.
The physical format is chosen automatically by
CheckpointStorebased on the DataFrame type returned byexecute():Polars — Arrow IPC via
sink_ipc()(streaming write, no collect) andscan_ipc()(lazy, memory-mapped read with predicate pushdown).Spark — Parquet directory via
df.write.parquet()andspark.read.parquet(). Cuts the lineage DAG; Photon-optimised.
Use
FromTempin a downstream step to consume the result.- Parameters:
name (str) – Logical name identifying this intermediate. By default the name must be unique across the pipeline — the compiler raises if two steps write to the same name.
scope (CheckpointScope) – Lifetime scope. Defaults to
RUN.append (bool) – When
True, multiple steps may write to this name; their outputs are concatenated and exposed as one logical intermediate (fan-in pattern). All writers for a given name must agree on the sameappendvalue — mixingTrueandFalseis a compile-time error.
Example:
# strict — only one step may write "normalized_orders" target = IntoTemp("normalized_orders") # fan-in — multiple partition steps write to the same intermediate target = IntoTemp("order_parts", append=True)
- property scope: CheckpointScope¶
Lifetime scope.
- class loom.etl.declarative.IntoHistory(ref, *, keys, effective_date, mode='snapshot', track=None, overwrite=None, delete_policy='close', partition_scope=None, valid_from='valid_from', valid_to='valid_to', deleted_at='deleted_at', date_type='date', schema=SchemaMode.STRICT, allow_temporal_rerun=False)[source]¶
Bases:
objectDeclare a Delta table as the ETL step SCD Type 2 history target.
Each run compares incoming data against the current open vectors in the target table. Only changed entity states generate new rows — unchanged entities produce no writes.
Column roles:
keys— entity identity. Never change; they are the MERGE join key.track— change-triggering columns. A value change inserts a new row and closes the previous open vector.Nonemeans every non-key column is tracked.overwrite— columns updated in-place on the open row when the entity is UNCHANGED. No new history row is created; the current open row is silently refreshed. Useful for mutable metadata that should not drive history.Remaining — “passive” columns: carried forward into each new row but never updated nor tracked.
Multiple simultaneous open vectors are natively supported. Include the distinguishing dimension in
track. For example, a player on loan to two clubs simultaneously usestrack=("team_id", "role")— the vectors(player_id=P1, team_id=RM, role=OWNER)and(player_id=P1, team_id=GET, role=LOAN)are independent and coexist without conflict.- Parameters:
ref (str | TableRef) – Logical table reference —
strorTableRef.keys (tuple[str, ...] | list[str]) – One or more column names that identify the entity. Must be non-empty and must not overlap with
track.effective_date (str | ParamExpr) – In
"log"mode: name of the frame column carrying the event date/timestamp. In"snapshot"mode: aParamExpror column name resolved from run params.mode (Literal['snapshot', 'log']) –
"snapshot"(default) or"log".track (tuple[str, ...] | list[str] | None) – Columns whose changes trigger a new history row.
Nonemeans all non-key columns are tracked.overwrite (tuple[str, ...] | list[str] | None) – Columns to update in-place on the open row when unchanged. Must not overlap with
keysortrack.delete_policy (Literal['ignore', 'close', 'soft_delete']) – Action for absent keys in SNAPSHOT mode. Defaults to
"close".partition_scope (tuple[str, ...] | list[str] | None) – Partition columns to constrain Delta reads/writes. Strongly recommended for large tables.
valid_from (str) – Name of the period-start column in the Delta table. Defaults to
"valid_from".valid_to (str) – Name of the period-end column in the Delta table. Defaults to
"valid_to".date_type (Literal['date', 'timestamp']) – Precision for boundary columns. Defaults to
"date".schema (SchemaMode) – Schema evolution strategy. Defaults to
STRICT.allow_temporal_rerun (bool) – Allow re-weave when past-date corrections are loaded. Defaults to
False.deleted_at (str)
- Raises:
ValueError – If
keysis empty.ValueError – If
trackoverlaps withkeys.ValueError – If
overwriteoverlaps withkeys,track, or boundary columns.ValueError – If
valid_fromandvalid_toshare the same name.
Example:
target = IntoHistory( "warehouse.dim_players", keys=("player_id",), track=("team_id", "contract_value"), effective_date=params.run_date, mode="snapshot", delete_policy="close", partition_scope=("season",), )
- class loom.etl.declarative.SchemaMode(value)[source]¶
Bases:
StrEnumSchema evolution strategy applied by the target writer before each write.
Values:
STRICT— fail on incompatible schema changes.EVOLVE— allow additive evolution where supported.OVERWRITE— replace table schema with incoming schema.
- class loom.etl.declarative.HistorifyInputMode(value)[source]¶
Bases:
StrEnumInput semantics for the SCD2 writer.
Values:
SNAPSHOT— the frame is a full snapshot of the entity dimension. Keys absent from the snapshot are handled according todelete_policy.LOG— the frame carries individual change events; each row has aneffective_datecolumn that determines when the change took effect.
- class loom.etl.declarative.DeletePolicy(value)[source]¶
Bases:
StrEnumAction applied to entity keys absent from an incoming snapshot.
Only meaningful when
mode=SNAPSHOT; LOG mode ignores this setting.Values:
IGNORE— leave open vectors open. Use for partial or incremental snapshots where absence does not imply deletion.CLOSE— close the open vector by settingvalid_to = effective_date - 1. Standard SCD2 behavior for full-dimension snapshots.SOFT_DELETE— close the open vector and stamp adeleted_atcolumn. Use when downstream systems require explicit deletion audit trails.
- class loom.etl.declarative.HistoryDateType(value)[source]¶
Bases:
StrEnumColumn type used for
valid_from/valid_toboundary columns.Values:
DATE— calendar date precision. Suitable for daily SCD2 pipelines.TIMESTAMP— microsecond precision. Use for event-driven or sub-daily update patterns.
- class loom.etl.declarative.HistorifySpec(table_ref, keys, effective_date, mode, track=None, overwrite=None, delete_policy=DeletePolicy.CLOSE, partition_scope=None, valid_from='valid_from', valid_to='valid_to', deleted_at='deleted_at', date_type=HistoryDateType.DATE, schema_mode=SchemaMode.STRICT, allow_temporal_rerun=False)[source]¶
Bases:
objectCompiled SCD Type 2 write spec. Produced by
IntoHistory._to_spec().This frozen dataclass is the sole input to the backend engine. All configuration is resolved at declaration time — no runtime reflection occurs.
Both
table_refandschema_modeare required for duck-type detection by the write-policy dispatcher (_is_table_target_spec).- Parameters:
table_ref (TableRef) – Logical Delta table reference.
keys (tuple[str, ...]) – Columns that uniquely identify the entity (e.g.
("player_id",)).effective_date (str | ParamExpr) – Column name in LOG mode, or a
ParamExprin SNAPSHOT mode.mode (HistorifyInputMode) – Input semantics —
SNAPSHOTorLOG.track (tuple[str, ...] | None) – Columns whose value change triggers a new history row.
Nonemeans every non-key column is tracked.overwrite (tuple[str, ...] | None) – Columns updated in-place on the current open row when the entity is UNCHANGED (no new history row created). Ignored in LOG mode. Must not overlap with
keysortrack.delete_policy (DeletePolicy) – Action for absent keys in SNAPSHOT mode. Ignored in LOG mode.
partition_scope (tuple[str, ...] | None) – Partition columns used to limit Delta reads/writes. Strongly recommended for large tables.
valid_from (str) – Name of the period-start boundary column.
valid_to (str) – Name of the period-end boundary column.
NULLin the table means the vector is currently open.deleted_at (str) – Name of the soft-delete audit column. Only written when
delete_policy=SOFT_DELETE.date_type (HistoryDateType) – Precision of the
valid_from/valid_tocolumns.schema_mode (SchemaMode) – Schema evolution strategy.
allow_temporal_rerun (bool) – Allow re-weave when past-date data is loaded.
- class loom.etl.declarative.HistorifyRepairReport(affected_keys, dates_requiring_rerun, warnings)[source]¶
Bases:
objectStructured report produced by a successful re-weave operation.
Returned by the backend engine when
allow_temporal_rerun=Trueand past-date data was corrected. Consumers may use this to schedule downstream re-runs for the affected date range.- Parameters:
- exception loom.etl.declarative.HistorifyKeyConflictError(duplicates)[source]¶
Bases:
ValueErrorRaised when the incoming snapshot contains duplicate
keys + trackcombinations.SCD2 semantics require that each entity state vector is unique within a snapshot. Duplicates indicate a data-modelling error: the distinguishing dimension must be included in
keysortrack.If the same entity legitimately belongs to multiple simultaneous states (e.g. a player on loan to two clubs at once), include the distinguishing column in
track— both vectors will then be tracked independently.- Parameters:
duplicates (str) – Human-readable representation of the conflicting key tuples.
- Return type:
None
- exception loom.etl.declarative.HistorifyDateCollisionError(collisions)[source]¶
Bases:
ValueErrorRaised in LOG mode when two events share the same
(keys + track, effective_date).At DATE granularity, two events on the same calendar day for the same entity state vector are ambiguous — relative ordering is lost and idempotency cannot be guaranteed.
To resolve: switch to
date_type="timestamp"for sub-daily precision, or deduplicate events upstream before loading.- Parameters:
collisions (str) – Human-readable description of the colliding key/date pairs.
- Return type:
None
- exception loom.etl.declarative.HistorifyTemporalConflictError(min_conflict_date, effective_date)[source]¶
Bases:
ValueErrorRaised when the target contains future-open vectors and re-weave is disabled.
A temporal conflict occurs when
valid_from > effective_date, meaning the incoming data must be inserted before an already-committed record. This requires explicit opt-in viaallow_temporal_rerun=True.