Source code for loom.etl.observability.observers.protocol
"""Observer protocol for ETL lifecycle hooks."""
from __future__ import annotations
from typing import Any, Protocol, runtime_checkable
from loom.etl.compiler._plan import PipelinePlan, ProcessPlan, StepPlan
from loom.etl.observability.records import RunContext, RunStatus
[docs]
@runtime_checkable
class ETLRunObserver(Protocol):
"""Protocol for ETL lifecycle observability callbacks."""
[docs]
def on_pipeline_start(self, plan: PipelinePlan, _params: Any, ctx: RunContext) -> None:
"""Called before the first process of the pipeline executes."""
[docs]
def on_pipeline_end(self, ctx: RunContext, status: RunStatus, duration_ms: int) -> None:
"""Called after pipeline completion or first unhandled error."""
[docs]
def on_process_start(self, plan: ProcessPlan, ctx: RunContext, process_run_id: str) -> None:
"""Called before the first step of a process executes."""
[docs]
def on_process_end(self, process_run_id: str, status: RunStatus, duration_ms: int) -> None:
"""Called after process completion or first unhandled error."""
[docs]
def on_step_start(self, plan: StepPlan, ctx: RunContext, step_run_id: str) -> None:
"""Called before sources are read."""
[docs]
def on_step_end(self, step_run_id: str, status: RunStatus, duration_ms: int) -> None:
"""Called after write completes or on failure."""
[docs]
def on_step_error(self, step_run_id: str, exc: Exception) -> None:
"""Called before ``on_step_end`` when a step fails."""
__all__ = ["ETLRunObserver"]