Source code for loom.etl.observability.recording._recorder

"""Observer that persists completed lifecycle records into an execution record store."""

from __future__ import annotations

import threading
from dataclasses import dataclass
from datetime import UTC, datetime
from typing import Any

from loom.etl.compiler._plan import PipelinePlan, ProcessPlan, StepPlan
from loom.etl.observability.records import (
    EventName,
    PipelineRunRecord,
    ProcessRunRecord,
    RunContext,
    RunStatus,
    StepRunRecord,
)
from loom.etl.observability.sinks._protocol import ExecutionRecordStore


[docs] class ExecutionRecordsObserver: """Convert lifecycle callbacks into persisted execution records. Args: store: Persistence backend implementing :class:`ExecutionRecordStore`. """ def __init__(self, store: ExecutionRecordStore) -> None: self._store = store self._write_lock = threading.Lock() self._pipeline_ctx: dict[str, _EntityContext] = {} self._process_ctx: dict[str, _EntityContext] = {} self._step_ctx: dict[str, _EntityContext] = {} self._step_errors: dict[str, _ErrorDetails] = {} self._process_failures: dict[str, _FailureContext] = {} self._pipeline_failures: dict[str, _FailureContext] = {} def on_pipeline_start(self, plan: PipelinePlan, _params: Any, ctx: RunContext) -> None: with self._write_lock: self._pipeline_ctx[ctx.run_id] = _EntityContext( run_ctx=ctx, name=plan.pipeline_type.__name__, started_at=_now(), ) def on_pipeline_end(self, ctx: RunContext, status: RunStatus, duration_ms: int) -> None: with self._write_lock: entity_ctx = self._pipeline_ctx.pop(ctx.run_id, None) failure = self._pipeline_failures.pop(ctx.run_id, None) if entity_ctx is None: return self._store.write_record( _build_pipeline_record( entity_ctx, failure, status=status, duration_ms=duration_ms, ) ) def on_process_start(self, plan: ProcessPlan, ctx: RunContext, process_run_id: str) -> None: with self._write_lock: self._process_ctx[process_run_id] = _EntityContext( run_ctx=ctx, name=plan.process_type.__name__, started_at=_now(), ) def on_process_end(self, process_run_id: str, status: RunStatus, duration_ms: int) -> None: with self._write_lock: entity_ctx = self._process_ctx.pop(process_run_id, None) failure = self._process_failures.pop(process_run_id, None) if entity_ctx is None: return self._store.write_record( _build_process_record( entity_ctx, process_run_id, failure, status=status, duration_ms=duration_ms, ) ) def on_step_start(self, plan: StepPlan, ctx: RunContext, step_run_id: str) -> None: with self._write_lock: self._step_ctx[step_run_id] = _EntityContext( run_ctx=ctx, name=plan.step_type.__name__, started_at=_now(), ) def on_step_end(self, step_run_id: str, status: RunStatus, duration_ms: int) -> None: failure: _FailureContext | None = None with self._write_lock: ctx = self._step_ctx.pop(step_run_id, None) if ctx is None: return error = self._step_errors.pop(step_run_id, None) if status is RunStatus.FAILED and error is not None: failure = _FailureContext( step_run_id=step_run_id, step=ctx.name, error=error.error, error_type=error.error_type, error_message=error.error_message, ) if ctx.run_ctx.process_run_id is not None: self._process_failures[ctx.run_ctx.process_run_id] = failure self._pipeline_failures[ctx.run_ctx.run_id] = failure record = StepRunRecord( event=EventName.STEP_END, run_id=ctx.run_ctx.run_id, correlation_id=ctx.run_ctx.correlation_id, attempt=ctx.run_ctx.attempt, step_run_id=step_run_id, step=ctx.name, started_at=ctx.started_at, status=status, duration_ms=duration_ms, error=error.error if error else None, process_run_id=ctx.run_ctx.process_run_id, error_type=error.error_type if error else None, error_message=error.error_message if error else None, ) self._store.write_record(record) def on_step_error(self, step_run_id: str, exc: Exception) -> None: with self._write_lock: self._step_errors[step_run_id] = _ErrorDetails.from_exception(exc)
def _now() -> datetime: return datetime.now(tz=UTC) @dataclass(frozen=True) class _EntityContext: """Captured context at entity start.""" run_ctx: RunContext name: str started_at: datetime @dataclass(frozen=True) class _ErrorDetails: error: str error_type: str error_message: str @classmethod def from_exception(cls, exc: Exception) -> _ErrorDetails: return cls( error=repr(exc), error_type=type(exc).__name__, error_message=str(exc), ) @dataclass(frozen=True) class _FailureContext: step_run_id: str step: str error: str error_type: str error_message: str def _build_pipeline_record( entity_ctx: _EntityContext, failure: _FailureContext | None, *, status: RunStatus, duration_ms: int, ) -> PipelineRunRecord: """Build pipeline end record from captured context and optional failure.""" return PipelineRunRecord( event=EventName.PIPELINE_END, run_id=entity_ctx.run_ctx.run_id, correlation_id=entity_ctx.run_ctx.correlation_id, attempt=entity_ctx.run_ctx.attempt, pipeline=entity_ctx.name, started_at=entity_ctx.started_at, status=status, duration_ms=duration_ms, error=failure.error if failure else None, error_type=failure.error_type if failure else None, error_message=failure.error_message if failure else None, failed_step_run_id=failure.step_run_id if failure else None, failed_step=failure.step if failure else None, ) def _build_process_record( entity_ctx: _EntityContext, process_run_id: str, failure: _FailureContext | None, *, status: RunStatus, duration_ms: int, ) -> ProcessRunRecord: """Build process end record from captured context and optional failure.""" return ProcessRunRecord( event=EventName.PROCESS_END, run_id=entity_ctx.run_ctx.run_id, correlation_id=entity_ctx.run_ctx.correlation_id, attempt=entity_ctx.run_ctx.attempt, process_run_id=process_run_id, process=entity_ctx.name, started_at=entity_ctx.started_at, status=status, duration_ms=duration_ms, error=failure.error if failure else None, error_type=failure.error_type if failure else None, error_message=failure.error_message if failure else None, failed_step_run_id=failure.step_run_id if failure else None, failed_step=failure.step if failure else None, ) __all__ = ["ExecutionRecordsObserver"]