Source code for loom.etl.backends._historify._common

"""Common helpers shared by all Historify backends."""

from __future__ import annotations

from datetime import timedelta
from typing import Any

from loom.etl.declarative.expr._params import ParamExpr
from loom.etl.declarative.target._history import HistorifyInputMode, HistorifySpec, HistoryDateType

_DATE_DELTA = timedelta(days=1)
_TS_DELTA = timedelta(microseconds=1)


[docs] def resolve_effective_date(spec: HistorifySpec, params_instance: object) -> Any: """Resolve effective date to a scalar (SNAPSHOT) or column name (LOG).""" if spec.mode is HistorifyInputMode.LOG: return spec.effective_date if isinstance(spec.effective_date, ParamExpr): return eval_param_expr(spec.effective_date, params_instance) return spec.effective_date
[docs] def eval_param_expr(expr: ParamExpr, params_instance: object) -> Any: """Walk the ParamExpr attribute path and return the resolved value.""" value: Any = params_instance for attr in expr.path: value = getattr(value, attr) return value
[docs] def resolve_track_cols(spec: HistorifySpec, frame_cols: list[str]) -> tuple[str, ...]: """Return tracked columns — explicit or all non-key, non-history columns.""" if spec.track is not None: return spec.track excluded = set(spec.keys) | {spec.valid_from, spec.valid_to} return tuple(c for c in frame_cols if c not in excluded)
[docs] def prev_period_value(eff_date: Any, spec: HistorifySpec) -> Any: """Return one unit before eff_date (one day or one microsecond).""" delta = _DATE_DELTA if spec.date_type is HistoryDateType.DATE else _TS_DELTA return eff_date - delta