Source code for loom.etl.observability.records

"""Execution observability records and lifecycle enums."""

from __future__ import annotations

import dataclasses
from dataclasses import dataclass
from datetime import datetime
from enum import StrEnum
from typing import Any, Final

from loom.etl.schema import ColumnSchema, DatetimeType, LoomDtype


[docs] class RunStatus(StrEnum): """Terminal status of a pipeline, process, or step run.""" SUCCESS = "success" FAILED = "failed"
[docs] class EventName(StrEnum): """All lifecycle event names emitted by the ETL executor.""" PIPELINE_START = "pipeline_start" PIPELINE_END = "pipeline_end" PROCESS_START = "process_start" PROCESS_END = "process_end" STEP_START = "step_start" STEP_END = "step_end" STEP_ERROR = "step_error"
[docs] class RecordField(StrEnum): """Field names for execution records.""" PIPELINE_RUNS = "pipeline_runs" PROCESS_RUNS = "process_runs" STEP_RUNS = "step_runs" EVENT = "event" RUN_ID = "run_id" CORRELATION_ID = "correlation_id" ATTEMPT = "attempt" STARTED_AT = "started_at" STATUS = "status" DURATION_MS = "duration_ms" ERROR = "error" ERROR_TYPE = "error_type" ERROR_MESSAGE = "error_message" FAILED_STEP_RUN_ID = "failed_step_run_id" FAILED_STEP = "failed_step" PIPELINE = "pipeline" PROCESS_RUN_ID = "process_run_id" PROCESS = "process" STEP_RUN_ID = "step_run_id" STEP = "step"
[docs] @dataclass(frozen=True) class RunContext: """Execution context shared across all events of a pipeline attempt.""" run_id: str correlation_id: str | None = None attempt: int = 1 last_attempt: bool = True process_run_id: str | None = None
[docs] @dataclass(frozen=True) class PipelineRunRecord: """Snapshot of a completed pipeline run.""" event: EventName run_id: str correlation_id: str | None attempt: int pipeline: str started_at: datetime status: RunStatus duration_ms: int error: str | None error_type: str | None = None error_message: str | None = None failed_step_run_id: str | None = None failed_step: str | None = None
[docs] def to_row(self) -> dict[str, Any]: """Convert record into a storage row mapping.""" return _record_to_row(self)
[docs] @dataclass(frozen=True) class ProcessRunRecord: """Snapshot of a completed process run.""" event: EventName run_id: str correlation_id: str | None attempt: int process_run_id: str process: str started_at: datetime status: RunStatus duration_ms: int error: str | None error_type: str | None = None error_message: str | None = None failed_step_run_id: str | None = None failed_step: str | None = None
[docs] def to_row(self) -> dict[str, Any]: """Convert record into a storage row mapping.""" return _record_to_row(self)
[docs] @dataclass(frozen=True) class StepRunRecord: """Snapshot of a completed step run.""" event: EventName run_id: str correlation_id: str | None attempt: int step_run_id: str step: str started_at: datetime status: RunStatus duration_ms: int error: str | None process_run_id: str | None = None error_type: str | None = None error_message: str | None = None
[docs] def to_row(self) -> dict[str, Any]: """Convert record into a storage row mapping.""" return _record_to_row(self)
ExecutionRecord = PipelineRunRecord | ProcessRunRecord | StepRunRecord def _record_to_row(record: Any) -> dict[str, Any]: row = dataclasses.asdict(record) row.pop("event", None) row["status"] = str(row["status"]) return row _PIPELINE_RECORD_SCHEMA: tuple[ColumnSchema, ...] = ( ColumnSchema("run_id", LoomDtype.UTF8, nullable=False), ColumnSchema("correlation_id", LoomDtype.UTF8, nullable=True), ColumnSchema("attempt", LoomDtype.INT64, nullable=False), ColumnSchema("pipeline", LoomDtype.UTF8, nullable=False), ColumnSchema("started_at", DatetimeType("us", "UTC"), nullable=False), ColumnSchema("status", LoomDtype.UTF8, nullable=False), ColumnSchema("duration_ms", LoomDtype.INT64, nullable=False), ColumnSchema("error", LoomDtype.UTF8, nullable=True), ColumnSchema("error_type", LoomDtype.UTF8, nullable=True), ColumnSchema("error_message", LoomDtype.UTF8, nullable=True), ColumnSchema("failed_step_run_id", LoomDtype.UTF8, nullable=True), ColumnSchema("failed_step", LoomDtype.UTF8, nullable=True), ) _PROCESS_RECORD_SCHEMA: tuple[ColumnSchema, ...] = ( ColumnSchema("run_id", LoomDtype.UTF8, nullable=False), ColumnSchema("correlation_id", LoomDtype.UTF8, nullable=True), ColumnSchema("attempt", LoomDtype.INT64, nullable=False), ColumnSchema("process_run_id", LoomDtype.UTF8, nullable=False), ColumnSchema("process", LoomDtype.UTF8, nullable=False), ColumnSchema("started_at", DatetimeType("us", "UTC"), nullable=False), ColumnSchema("status", LoomDtype.UTF8, nullable=False), ColumnSchema("duration_ms", LoomDtype.INT64, nullable=False), ColumnSchema("error", LoomDtype.UTF8, nullable=True), ColumnSchema("error_type", LoomDtype.UTF8, nullable=True), ColumnSchema("error_message", LoomDtype.UTF8, nullable=True), ColumnSchema("failed_step_run_id", LoomDtype.UTF8, nullable=True), ColumnSchema("failed_step", LoomDtype.UTF8, nullable=True), ) _STEP_RECORD_SCHEMA: tuple[ColumnSchema, ...] = ( ColumnSchema("run_id", LoomDtype.UTF8, nullable=False), ColumnSchema("correlation_id", LoomDtype.UTF8, nullable=True), ColumnSchema("attempt", LoomDtype.INT64, nullable=False), ColumnSchema("step_run_id", LoomDtype.UTF8, nullable=False), ColumnSchema("step", LoomDtype.UTF8, nullable=False), ColumnSchema("started_at", DatetimeType("us", "UTC"), nullable=False), ColumnSchema("status", LoomDtype.UTF8, nullable=False), ColumnSchema("duration_ms", LoomDtype.INT64, nullable=False), ColumnSchema("error", LoomDtype.UTF8, nullable=True), ColumnSchema("process_run_id", LoomDtype.UTF8, nullable=True), ColumnSchema("error_type", LoomDtype.UTF8, nullable=True), ColumnSchema("error_message", LoomDtype.UTF8, nullable=True), ) _RECORD_SCHEMA_MAP: Final[dict[type[ExecutionRecord], tuple[ColumnSchema, ...]]] = { PipelineRunRecord: _PIPELINE_RECORD_SCHEMA, ProcessRunRecord: _PROCESS_RECORD_SCHEMA, StepRunRecord: _STEP_RECORD_SCHEMA, } def get_record_schema(record_type: type[ExecutionRecord]) -> tuple[ColumnSchema, ...]: """Return the canonical :class:`~loom.etl.schema.ColumnSchema` tuple for *record_type*. Args: record_type: One of ``PipelineRunRecord``, ``ProcessRunRecord``, or ``StepRunRecord``. Returns: Ordered tuple of column schemas that describes the storage representation produced by :meth:`~ExecutionRecord.to_row`. Raises: KeyError: If *record_type* has no registered schema. """ return _RECORD_SCHEMA_MAP[record_type] _TABLE_MAP: dict[type[ExecutionRecord], str] = { PipelineRunRecord: RecordField.PIPELINE_RUNS, ProcessRunRecord: RecordField.PROCESS_RUNS, StepRunRecord: RecordField.STEP_RUNS, }
[docs] def get_record_table_name(record_type: type[ExecutionRecord]) -> str: """Get table name for a given record type.""" return _TABLE_MAP[record_type]
__all__ = [ "EventName", "ExecutionRecord", "get_record_schema", "get_record_table_name", "PipelineRunRecord", "ProcessRunRecord", "RecordField", "RunContext", "RunStatus", "StepRunRecord", ]