Source code for loom.etl.backends._historify._transform
"""Backend-agnostic SCD2 transform delegating frame operations to a HistorifyBackend."""
from __future__ import annotations
from typing import Any, TypeVar
from loom.etl.backends._historify._common import (
resolve_effective_date,
resolve_track_cols,
)
from loom.etl.backends._historify._log import apply_log
from loom.etl.backends._historify._ops import HistorifyBackend
from loom.etl.backends._historify._snapshot import apply_snapshot
from loom.etl.declarative.target._history import (
HistorifyInputMode,
HistorifySpec,
HistorifyTemporalConflictError,
)
F = TypeVar("F")
[docs]
def scd2_transform(
ops: HistorifyBackend[F],
frame: F,
existing: F | None,
spec: HistorifySpec,
params_instance: object,
) -> F:
"""Apply SCD Type 2 logic and return the transformed frame.
Args:
ops: Backend-specific frame operations.
frame: Incoming data frame.
existing: Current target frame, or ``None`` for first run.
spec: Compiled HistorifySpec.
params_instance: Runtime params for ParamExpr resolution.
Returns:
Transformed frame ready to be written to Delta.
Raises:
HistorifyKeyConflictError: Duplicate entity state vectors.
HistorifyDateCollisionError: Same-date collisions in LOG mode.
HistorifyTemporalConflictError: Future-open records, re-weave off.
"""
track_cols = resolve_track_cols(spec, ops.columns(frame))
join_key = list(spec.keys) + list(track_cols)
eff_date = resolve_effective_date(spec, params_instance)
ops.assert_unique_keys(frame, join_key)
if spec.mode is HistorifyInputMode.LOG:
ops.assert_no_date_collisions(frame, join_key, str(spec.effective_date), spec)
if existing is None:
return _first_run(ops, frame, spec, join_key, eff_date)
_temporal_guard(ops, existing, spec, eff_date)
if spec.mode is HistorifyInputMode.SNAPSHOT:
return apply_snapshot(ops, frame, existing, spec, join_key, eff_date)
return apply_log(ops, frame, existing, spec, join_key)
def _first_run(
ops: HistorifyBackend[F],
frame: F,
spec: HistorifySpec,
join_key: list[str],
eff_date: Any,
) -> F:
"""Build the initial history frame when the target table is empty."""
if spec.mode is HistorifyInputMode.SNAPSHOT:
dtype = ops.history_dtype(spec)
result = ops.stamp_col(
ops.stamp_col(frame, spec.valid_from, eff_date, dtype),
spec.valid_to,
None,
dtype,
)
return ops.ensure_soft_delete_col(result, spec)
return ops.build_log_boundaries(frame, spec)
def _temporal_guard(
ops: HistorifyBackend[F],
existing: F,
spec: HistorifySpec,
eff_date: Any,
) -> None:
"""Raise if future-open records exist and re-weave is disabled."""
if spec.allow_temporal_rerun or spec.mode is HistorifyInputMode.LOG:
return
min_conflict = ops.temporal_conflict_min_date(existing, spec, eff_date)
if min_conflict is None:
return
raise HistorifyTemporalConflictError(min_conflict, eff_date)