Source code for loom.etl.observability.observers.composite

"""Composite observer with per-observer error isolation."""

from __future__ import annotations

from collections.abc import Sequence
from typing import Any

import structlog

from loom.etl.observability.observers.protocol import ETLRunObserver
from loom.etl.observability.records import RunContext, RunStatus

_log: Any = structlog.get_logger("loom.etl.observer")


[docs] class CompositeObserver: """Fan out lifecycle events to multiple observers safely.""" def __init__(self, observers: Sequence[ETLRunObserver]) -> None: self._observers = tuple(observers) def on_pipeline_start(self, plan: Any, _params: Any, ctx: RunContext) -> None: for observer in self._observers: _safe(observer.on_pipeline_start, plan, _params, ctx) def on_pipeline_end(self, ctx: RunContext, status: RunStatus, duration_ms: int) -> None: for observer in self._observers: _safe(observer.on_pipeline_end, ctx, status, duration_ms) def on_process_start(self, plan: Any, ctx: RunContext, process_run_id: str) -> None: for observer in self._observers: _safe(observer.on_process_start, plan, ctx, process_run_id) def on_process_end(self, process_run_id: str, status: RunStatus, duration_ms: int) -> None: for observer in self._observers: _safe(observer.on_process_end, process_run_id, status, duration_ms) def on_step_start(self, plan: Any, ctx: RunContext, step_run_id: str) -> None: for observer in self._observers: _safe(observer.on_step_start, plan, ctx, step_run_id) def on_step_end(self, step_run_id: str, status: RunStatus, duration_ms: int) -> None: for observer in self._observers: _safe(observer.on_step_end, step_run_id, status, duration_ms) def on_step_error(self, step_run_id: str, exc: Exception) -> None: for observer in self._observers: _safe(observer.on_step_error, step_run_id, exc)
def _safe(fn: Any, *args: Any) -> None: try: fn(*args) except Exception as exc: # noqa: BLE001 - callbacks are user code; failures are isolated. _log.error( "observer_error", observer=type(fn.__self__).__name__ if hasattr(fn, "__self__") else repr(fn), error=repr(exc), ) __all__ = ["CompositeObserver"]