Source code for loom.etl.observability.observers.structlog

"""Structured log observer for ETL lifecycle events."""

from __future__ import annotations

from typing import Any

import structlog

from loom.etl.observability.observers._labels import (
    source_label,
    target_label,
    write_mode_label,
)
from loom.etl.observability.records import EventName, RunContext, RunStatus

_log: Any = structlog.get_logger("loom.etl")


[docs] class StructlogRunObserver: """Observer that emits structured lifecycle events through structlog. Args: slow_step_threshold_ms: Optional threshold to emit ``slow_step`` warnings. """ def __init__(self, slow_step_threshold_ms: int | None = None) -> None: self._slow_ms = slow_step_threshold_ms def on_pipeline_start(self, plan: Any, _params: Any, ctx: RunContext) -> None: _log.info( EventName.PIPELINE_START, pipeline=plan.pipeline_type.__name__, run_id=ctx.run_id, correlation_id=ctx.correlation_id, attempt=ctx.attempt, ) def on_pipeline_end(self, ctx: RunContext, status: RunStatus, duration_ms: int) -> None: _log.info( EventName.PIPELINE_END, run_id=ctx.run_id, correlation_id=ctx.correlation_id, attempt=ctx.attempt, status=status, duration_ms=duration_ms, ) def on_process_start(self, plan: Any, ctx: RunContext, process_run_id: str) -> None: _log.info( EventName.PROCESS_START, process=plan.process_type.__name__, steps=len(plan.nodes), run_id=ctx.run_id, process_run_id=process_run_id, ) def on_process_end(self, process_run_id: str, status: RunStatus, duration_ms: int) -> None: _log.info( EventName.PROCESS_END, process_run_id=process_run_id, status=status, duration_ms=duration_ms, ) def on_step_start(self, plan: Any, ctx: RunContext, step_run_id: str) -> None: sources = [source_label(binding.spec) for binding in plan.source_bindings] _log.info( EventName.STEP_START, step=plan.step_type.__name__, sources=sources, target=target_label(plan.target_binding.spec), write_mode=write_mode_label(plan.target_binding.spec), run_id=ctx.run_id, step_run_id=step_run_id, ) def on_step_end(self, step_run_id: str, status: RunStatus, duration_ms: int) -> None: _log.info( EventName.STEP_END, step_run_id=step_run_id, status=status, duration_ms=duration_ms, ) if self._slow_ms is not None and duration_ms > self._slow_ms: _log.warning( "slow_step", step_run_id=step_run_id, duration_ms=duration_ms, threshold_ms=self._slow_ms, ) def on_step_error(self, step_run_id: str, exc: Exception) -> None: _log.error( EventName.STEP_ERROR, step_run_id=step_run_id, error=repr(exc), )
__all__ = ["StructlogRunObserver", "source_label", "target_label", "write_mode_label"]