Source code for loom.etl.observability.factory
"""Observability factory for ETL runtime observers."""
from __future__ import annotations
from loom.etl.observability.config import ObservabilityConfig
from loom.etl.observability.observers.otel import build_otel_observer
from loom.etl.observability.observers.protocol import ETLRunObserver
from loom.etl.observability.observers.structlog import StructlogRunObserver
from loom.etl.observability.recording._recorder import ExecutionRecordsObserver
from loom.etl.observability.sinks._protocol import ExecutionRecordWriter
from loom.etl.observability.sinks._table import TableExecutionRecordStore
[docs]
def make_observers(
config: ObservabilityConfig,
*,
record_writer: ExecutionRecordWriter | None = None,
) -> list[ETLRunObserver]:
"""Build runtime observers from observability config and optional record writer."""
observers = _build_event_observers(config)
recording = _build_recording_observer(config, record_writer)
if recording is not None:
observers.append(recording)
return observers
def _build_event_observers(config: ObservabilityConfig) -> list[ETLRunObserver]:
observers: list[ETLRunObserver] = []
if config.log:
observers.append(StructlogRunObserver(slow_step_threshold_ms=config.slow_step_threshold_ms))
if config.otel or config.otel_config is not None:
observers.append(build_otel_observer(config.otel_config))
return observers
def _build_recording_observer(
config: ObservabilityConfig,
record_writer: ExecutionRecordWriter | None,
) -> ExecutionRecordsObserver | None:
store_cfg = config.record_store
if store_cfg is None:
return None
if record_writer is None:
raise ValueError(
"make_observers: record_writer is required when observability.record_store is enabled."
)
store_cfg.validate()
store = TableExecutionRecordStore(writer=record_writer, database=store_cfg.database)
return ExecutionRecordsObserver(store)
__all__ = ["make_observers"]