Source code for loom.etl.backends._historify._ops

"""HistorifyBackend Protocol — backend-specific primitives for SCD Type 2 algorithms."""

from __future__ import annotations

from typing import Any, Protocol, TypeVar

from loom.etl.declarative.target._history import HistorifySpec

F = TypeVar("F")


[docs] class HistorifyBackend(Protocol[F]): """Backend-specific frame operations used by SCD2Transform.""" def columns(self, frame: F) -> list[str]: ...
[docs] def history_dtype(self, spec: HistorifySpec) -> Any: """Return the native dtype for valid_from / valid_to columns.""" ...
def filter_null(self, frame: F, col: str) -> F: ... def filter_not_null(self, frame: F, col: str) -> F: ... def filter_eq(self, frame: F, col: str, value: Any, dtype: Any) -> F: ... def filter_ne(self, frame: F, col: str, value: Any, dtype: Any) -> F: ... def anti_join(self, left: F, right: F, on: list[str]) -> F: ... def semi_join(self, left: F, right: F, on: list[str]) -> F: ... def union(self, frames: list[F]) -> F: ...
[docs] def stamp_col(self, frame: F, name: str, value: Any, dtype: Any) -> F: """Add or overwrite column with literal value.""" ...
[docs] def null_col(self, frame: F, name: str, dtype: Any) -> F: """Add null column with given dtype.""" ...
[docs] def rename(self, frame: F, rename_map: dict[str, str]) -> F: """Rename columns according to mapping.""" ...
[docs] def drop(self, frame: F, cols: list[str]) -> F: """Drop columns.""" ...
[docs] def dedup_last(self, frame: F, subset: list[str]) -> F: """Deduplicate keeping last occurrence.""" ...
[docs] def apply_overwrite_cols( self, unchanged: F, incoming: F, join_key: list[str], overwrite: tuple[str, ...] ) -> F: """Replace overwrite columns in unchanged rows with values from incoming. Drops the overwrite columns from ``unchanged`` then joins the fresh values from ``incoming`` on ``join_key``. The open row is refreshed in-place; no new history row is created. """ ...
[docs] def rollback_same_day_run( self, frame: F, spec: HistorifySpec, eff_date: Any, join_key: list[str] ) -> F: """Undo a previous run on the same eff_date (SNAPSHOT idempotency).""" ...
[docs] def build_log_boundaries(self, frame: F, spec: HistorifySpec) -> F: """Compute valid_from / valid_to from sorted event frame (LOG mode).""" ...
def apply_delete_policy(self, deleted: F, spec: HistorifySpec, eff_date: Any) -> F: ... def ensure_soft_delete_col(self, result: F, spec: HistorifySpec) -> F: ... def assert_unique_keys(self, frame: F, keys: list[str]) -> None: ... def assert_no_date_collisions( self, frame: F, keys: list[str], eff_col: str, spec: HistorifySpec ) -> None: ...
[docs] def temporal_conflict_min_date( self, existing: F, spec: HistorifySpec, eff_date: Any ) -> Any | None: """Return min conflicting valid_from or None if no conflict.""" ...