Source code for loom.etl.declarative.target._history._builder

"""SCD Type 2 history target builder."""

from __future__ import annotations

from typing import Literal

from loom.etl.declarative.expr._params import ParamExpr
from loom.etl.declarative.expr._refs import TableRef
from loom.etl.declarative.target._history._enums import (
    DeletePolicy,
    HistorifyInputMode,
    HistoryDateType,
)
from loom.etl.declarative.target._history._spec import HistorifySpec
from loom.etl.declarative.target._schema_mode import SchemaMode


[docs] class IntoHistory: """Declare a Delta table as the ETL step SCD Type 2 history target. Each run compares incoming data against the current open vectors in the target table. Only changed entity states generate new rows — unchanged entities produce no writes. **Column roles:** * ``keys`` — entity identity. Never change; they are the MERGE join key. * ``track`` — change-triggering columns. A value change inserts a new row and closes the previous open vector. ``None`` means every non-key column is tracked. * ``overwrite`` — columns updated in-place on the open row when the entity is UNCHANGED. No new history row is created; the current open row is silently refreshed. Useful for mutable metadata that should not drive history. * *Remaining* — "passive" columns: carried forward into each new row but never updated nor tracked. **Multiple simultaneous open vectors** are natively supported. Include the distinguishing dimension in ``track``. For example, a player on loan to two clubs simultaneously uses ``track=(\"team_id\", \"role\")`` — the vectors ``(player_id=P1, team_id=RM, role=OWNER)`` and ``(player_id=P1, team_id=GET, role=LOAN)`` are independent and coexist without conflict. Args: ref: Logical table reference — ``str`` or :class:`~loom.etl.TableRef`. keys: One or more column names that identify the entity. Must be non-empty and must not overlap with ``track``. effective_date: In ``\"log\"`` mode: name of the frame column carrying the event date/timestamp. In ``\"snapshot\"`` mode: a :class:`~loom.etl.ParamExpr` or column name resolved from run params. mode: ``\"snapshot\"`` (default) or ``\"log\"``. track: Columns whose changes trigger a new history row. ``None`` means all non-key columns are tracked. overwrite: Columns to update in-place on the open row when unchanged. Must not overlap with ``keys`` or ``track``. delete_policy: Action for absent keys in SNAPSHOT mode. Defaults to ``\"close\"``. partition_scope: Partition columns to constrain Delta reads/writes. Strongly recommended for large tables. valid_from: Name of the period-start column in the Delta table. Defaults to ``\"valid_from\"``. valid_to: Name of the period-end column in the Delta table. Defaults to ``\"valid_to\"``. date_type: Precision for boundary columns. Defaults to ``\"date\"``. schema: Schema evolution strategy. Defaults to :attr:`~SchemaMode.STRICT`. allow_temporal_rerun: Allow re-weave when past-date corrections are loaded. Defaults to ``False``. Raises: ValueError: If ``keys`` is empty. ValueError: If ``track`` overlaps with ``keys``. ValueError: If ``overwrite`` overlaps with ``keys``, ``track``, or boundary columns. ValueError: If ``valid_from`` and ``valid_to`` share the same name. Example:: target = IntoHistory( "warehouse.dim_players", keys=("player_id",), track=("team_id", "contract_value"), effective_date=params.run_date, mode="snapshot", delete_policy="close", partition_scope=("season",), ) """ __slots__ = ("_spec",) def __init__( self, ref: str | TableRef, *, keys: tuple[str, ...] | list[str], effective_date: str | ParamExpr, mode: Literal["snapshot", "log"] = "snapshot", track: tuple[str, ...] | list[str] | None = None, overwrite: tuple[str, ...] | list[str] | None = None, delete_policy: Literal["ignore", "close", "soft_delete"] = "close", partition_scope: tuple[str, ...] | list[str] | None = None, valid_from: str = "valid_from", valid_to: str = "valid_to", deleted_at: str = "deleted_at", date_type: Literal["date", "timestamp"] = "date", schema: SchemaMode = SchemaMode.STRICT, allow_temporal_rerun: bool = False, ) -> None: keys_t = tuple(keys) track_t: tuple[str, ...] | None = tuple(track) if track is not None else None overwrite_t: tuple[str, ...] | None = tuple(overwrite) if overwrite is not None else None _validate_history_args(keys_t, track_t, overwrite_t, valid_from, valid_to, deleted_at) _validate_log_effective_date(effective_date, valid_from, valid_to, mode) table_ref = TableRef(ref) if isinstance(ref, str) else ref self._spec = HistorifySpec( table_ref=table_ref, keys=keys_t, effective_date=effective_date, mode=HistorifyInputMode(mode), track=track_t, overwrite=overwrite_t, delete_policy=DeletePolicy(delete_policy), partition_scope=tuple(partition_scope) if partition_scope is not None else None, valid_from=valid_from, valid_to=valid_to, deleted_at=deleted_at, date_type=HistoryDateType(date_type), schema_mode=schema, allow_temporal_rerun=allow_temporal_rerun, ) def _to_spec(self) -> HistorifySpec: """Return the compiled :class:`HistorifySpec`. Returns: Frozen :class:`HistorifySpec` with all declared configuration. """ return self._spec def __repr__(self) -> str: spec = self._spec return f"IntoHistory({spec.table_ref.ref!r}, keys={spec.keys!r}, mode={spec.mode!r})"
def _validate_history_args( keys: tuple[str, ...], track: tuple[str, ...] | None, overwrite: tuple[str, ...] | None, valid_from: str, valid_to: str, deleted_at: str, ) -> None: """Validate ``IntoHistory`` constructor arguments. Args: keys: Entity identity columns. track: Change-triggering columns, or ``None``. overwrite: In-place overwrite columns, or ``None``. valid_from: Period-start column name. valid_to: Period-end column name. deleted_at: Soft-delete audit column name. Raises: ValueError: On any violated constraint. """ if not keys: raise ValueError("IntoHistory: 'keys' must contain at least one column name.") if track is not None: overlap = set(keys) & set(track) if overlap: raise ValueError( f"IntoHistory: columns cannot appear in both 'keys' and 'track'. " f"Overlap: {sorted(overlap)}" ) if overwrite is not None: overlap_keys = set(keys) & set(overwrite) if overlap_keys: raise ValueError( f"IntoHistory: 'overwrite' cannot overlap with 'keys'. " f"Overlap: {sorted(overlap_keys)}" ) if track is not None: overlap_track = set(track) & set(overwrite) if overlap_track: raise ValueError( f"IntoHistory: 'overwrite' cannot overlap with 'track'. " f"Overlap: {sorted(overlap_track)}" ) boundary_overlap = {valid_from, valid_to, deleted_at} & set(overwrite) if boundary_overlap: raise ValueError( f"IntoHistory: 'overwrite' cannot contain boundary columns. " f"Overlap: {sorted(boundary_overlap)}" ) boundary_names = {valid_from, valid_to, deleted_at} if len(boundary_names) < 3: raise ValueError( f"IntoHistory: 'valid_from', 'valid_to' and 'deleted_at' must be distinct. " f"Got valid_from={valid_from!r}, valid_to={valid_to!r}, deleted_at={deleted_at!r}." ) def _validate_log_effective_date( effective_date: str | object, valid_from: str, valid_to: str, mode: str, ) -> None: """Raise if LOG mode effective_date clashes with boundary column names. In LOG mode the engine drops the effective_date column after computing valid_from / valid_to. If effective_date equals valid_from the drop would silently erase the just-created column. Args: effective_date: Column name or ParamExpr declared as the event date. valid_from: Period-start column name. valid_to: Period-end column name. mode: ``\"log\"`` or ``\"snapshot\"``. Raises: ValueError: When LOG mode effective_date conflicts with a boundary col. """ if mode != "log": return eff_col = str(effective_date) if eff_col in (valid_from, valid_to): raise ValueError( f"IntoHistory (LOG mode): effective_date={eff_col!r} must not match " f"valid_from={valid_from!r} or valid_to={valid_to!r}. " "The engine drops the effective_date column after building history boundaries; " "using the same name would silently erase the computed column." ) __all__ = ["IntoHistory"]