Source code for loom.etl.observability.observers.noop

"""No-op observer implementation."""

from __future__ import annotations

from typing import Any

from loom.etl.compiler._plan import PipelinePlan, ProcessPlan, StepPlan
from loom.etl.observability.records import RunContext, RunStatus


[docs] class NoopRunObserver: """Observer implementation that intentionally does nothing."""
[docs] def on_pipeline_start(self, _plan: PipelinePlan, _params: Any, _ctx: RunContext) -> None: """No-op hook."""
[docs] def on_pipeline_end(self, _ctx: RunContext, _status: RunStatus, _duration_ms: int) -> None: """No-op hook."""
[docs] def on_process_start(self, _plan: ProcessPlan, _ctx: RunContext, _process_run_id: str) -> None: """No-op hook."""
[docs] def on_process_end(self, _process_run_id: str, _status: RunStatus, _duration_ms: int) -> None: """No-op hook."""
[docs] def on_step_start(self, _plan: StepPlan, _ctx: RunContext, _step_run_id: str) -> None: """No-op hook."""
[docs] def on_step_end(self, _step_run_id: str, _status: RunStatus, _duration_ms: int) -> None: """No-op hook."""
[docs] def on_step_error(self, _step_run_id: str, _exc: Exception) -> None: """No-op hook."""
__all__ = ["NoopRunObserver"]