Source code for loom.etl.observability.config
"""YAML-loadable observability configuration."""
from __future__ import annotations
from typing import Any
import msgspec
[docs]
class ExecutionRecordStoreConfig(msgspec.Struct, frozen=True):
"""Configuration for persisted execution records.
Args:
root: Path/URI destination for path-based table mode.
database: Database/schema destination for catalog mode.
storage_options: Cloud credentials for path mode.
writer: Writer options for path mode.
delta_config: Delta table properties for path mode.
commit: Commit metadata for path mode.
"""
root: str = ""
database: str = ""
storage_options: dict[str, str] = {}
writer: dict[str, Any] = {}
delta_config: dict[str, str | None] = {}
commit: dict[str, Any] = {}
[docs]
def validate(self) -> None:
"""Validate that exactly one destination mode is configured.
Raises:
ValueError: If both or neither of ``root`` and ``database`` are set.
"""
has_root = bool(self.root.strip())
has_database = bool(self.database.strip())
if has_root == has_database:
raise ValueError(
"observability.record_store requires exactly one destination: "
"'root' (path mode) or 'database' (catalog mode)."
)
[docs]
class OtelConfig(msgspec.Struct, frozen=True):
"""OpenTelemetry SDK/exporter configuration.
Args:
service_name: Resource attribute ``service.name``.
tracer_name: Tracer instrumentation name.
tracer_version: Optional tracer instrumentation version.
protocol: OTLP protocol (``http/protobuf`` or ``grpc``).
endpoint: OTLP endpoint URI. When empty, uses global OTel runtime defaults.
insecure: Exporter transport mode when supported by protocol/exporter.
headers: Exporter request headers (vendor auth/tags).
resource_attributes: Additional OTel resource attributes.
span_attributes: Static span attributes added to all ETL spans.
exporter_kwargs: Extra keyword args passed through to OTLP exporter.
span_processor_kwargs: Extra keyword args passed through to BatchSpanProcessor.
"""
service_name: str = "loom-etl"
tracer_name: str = "loom.etl"
tracer_version: str = ""
protocol: str = "http/protobuf"
endpoint: str = ""
insecure: bool = True
headers: dict[str, str] = {}
resource_attributes: dict[str, str] = {}
span_attributes: dict[str, str] = {}
exporter_kwargs: dict[str, Any] = {}
span_processor_kwargs: dict[str, Any] = {}
[docs]
def validate(self) -> None:
"""Validate protocol field.
Raises:
ValueError: If protocol is not supported.
"""
if self.protocol not in {"http/protobuf", "grpc"}:
raise ValueError(
"observability.otel_config.protocol must be either 'http/protobuf' or 'grpc'."
)
[docs]
class ObservabilityConfig(msgspec.Struct, frozen=True):
"""Observability config loaded from the ``observability`` YAML section.
Args:
log: Enables structured runtime logs via :class:`StructlogRunObserver`.
otel: Enables OpenTelemetry tracing via :class:`OtelRunObserver`.
Requires the ``etl-otel`` extra.
otel_config: Optional OTel SDK/exporter config.
When set, OTel tracing is enabled even if ``otel=False``.
record_store: Enables persisted execution records via
:class:`ExecutionRecordsObserver`.
slow_step_threshold_ms: Optional slow-step warning threshold.
"""
log: bool = True
otel: bool = False
otel_config: OtelConfig | None = None
record_store: ExecutionRecordStoreConfig | None = None
slow_step_threshold_ms: int | None = None
__all__ = ["ExecutionRecordStoreConfig", "ObservabilityConfig", "OtelConfig"]