loom.etl.observability¶
Observability public API for ETL runtime hooks and persisted records.
- class loom.etl.observability.CompositeObserver(observers)[source]¶
Bases:
objectFan out lifecycle events to multiple observers safely.
- Parameters:
observers (Sequence[ETLRunObserver])
- class loom.etl.observability.ETLRunObserver(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for ETL lifecycle observability callbacks.
- on_pipeline_start(plan, _params, ctx)[source]¶
Called before the first process of the pipeline executes.
- Parameters:
plan (PipelinePlan)
_params (Any)
ctx (RunContext)
- Return type:
None
- on_pipeline_end(ctx, status, duration_ms)[source]¶
Called after pipeline completion or first unhandled error.
- Parameters:
ctx (RunContext)
status (RunStatus)
duration_ms (int)
- Return type:
None
- on_process_start(plan, ctx, process_run_id)[source]¶
Called before the first step of a process executes.
- Parameters:
plan (ProcessPlan)
ctx (RunContext)
process_run_id (str)
- Return type:
None
- on_process_end(process_run_id, status, duration_ms)[source]¶
Called after process completion or first unhandled error.
- on_step_start(plan, ctx, step_run_id)[source]¶
Called before sources are read.
- Parameters:
plan (StepPlan)
ctx (RunContext)
step_run_id (str)
- Return type:
None
- class loom.etl.observability.EventName(value)[source]¶
Bases:
StrEnumAll lifecycle event names emitted by the ETL executor.
- class loom.etl.observability.ExecutionRecordStore(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for persisting execution records.
- write_record(record)[source]¶
Persist one completed execution record.
- Parameters:
record (PipelineRunRecord | ProcessRunRecord | StepRunRecord)
- Return type:
None
- class loom.etl.observability.ExecutionRecordWriter(*args, **kwargs)[source]¶
Bases:
ProtocolPersist one execution record into a table destination.
- write_record(record, table_ref, /)[source]¶
Write record to table_ref.
- Parameters:
record (PipelineRunRecord | ProcessRunRecord | StepRunRecord)
table_ref (TableRef)
- Return type:
None
- class loom.etl.observability.ExecutionRecordStoreConfig(root='', database='', storage_options=<factory>, writer=<factory>, delta_config=<factory>, commit=<factory>)[source]¶
Bases:
StructConfiguration for persisted execution records.
- Parameters:
- validate()[source]¶
Validate that exactly one destination mode is configured.
- Raises:
ValueError – If both or neither of
rootanddatabaseare set.- Return type:
None
- class loom.etl.observability.ExecutionRecordsObserver(store)[source]¶
Bases:
objectConvert lifecycle callbacks into persisted execution records.
- Parameters:
store (ExecutionRecordStore) – Persistence backend implementing
ExecutionRecordStore.
- loom.etl.observability.get_record_table_name(record_type)[source]¶
Get table name for a given record type.
- Parameters:
record_type (type[PipelineRunRecord | ProcessRunRecord | StepRunRecord])
- Return type:
- class loom.etl.observability.NoopRunObserver[source]¶
Bases:
objectObserver implementation that intentionally does nothing.
- on_pipeline_start(_plan, _params, _ctx)[source]¶
No-op hook.
- Parameters:
_plan (PipelinePlan)
_params (Any)
_ctx (RunContext)
- Return type:
None
- on_pipeline_end(_ctx, _status, _duration_ms)[source]¶
No-op hook.
- Parameters:
_ctx (RunContext)
_status (RunStatus)
_duration_ms (int)
- Return type:
None
- on_process_start(_plan, _ctx, _process_run_id)[source]¶
No-op hook.
- Parameters:
_plan (ProcessPlan)
_ctx (RunContext)
_process_run_id (str)
- Return type:
None
- on_step_start(_plan, _ctx, _step_run_id)[source]¶
No-op hook.
- Parameters:
_plan (StepPlan)
_ctx (RunContext)
_step_run_id (str)
- Return type:
None
- class loom.etl.observability.ObservabilityConfig(log=True, otel=False, otel_config=None, record_store=None, slow_step_threshold_ms=None)[source]¶
Bases:
StructObservability config loaded from the
observabilityYAML section.- Parameters:
log (bool) – Enables structured runtime logs via
StructlogRunObserver.otel (bool) – Enables OpenTelemetry tracing via
OtelRunObserver. Requires theetl-otelextra.otel_config (OtelConfig | None) – Optional OTel SDK/exporter config. When set, OTel tracing is enabled even if
otel=False.record_store (ExecutionRecordStoreConfig | None) – Enables persisted execution records via
ExecutionRecordsObserver.slow_step_threshold_ms (int | None) – Optional slow-step warning threshold.
- class loom.etl.observability.OtelConfig(service_name='loom-etl', tracer_name='loom.etl', tracer_version='', protocol='http/protobuf', endpoint='', insecure=True, headers=<factory>, resource_attributes=<factory>, span_attributes=<factory>, exporter_kwargs=<factory>, span_processor_kwargs=<factory>)[source]¶
Bases:
StructOpenTelemetry SDK/exporter configuration.
- Parameters:
service_name (str) – Resource attribute
service.name.tracer_name (str) – Tracer instrumentation name.
tracer_version (str) – Optional tracer instrumentation version.
protocol (str) – OTLP protocol (
http/protobuforgrpc).endpoint (str) – OTLP endpoint URI. When empty, uses global OTel runtime defaults.
insecure (bool) – Exporter transport mode when supported by protocol/exporter.
headers (dict[str, str]) – Exporter request headers (vendor auth/tags).
resource_attributes (dict[str, str]) – Additional OTel resource attributes.
span_attributes (dict[str, str]) – Static span attributes added to all ETL spans.
exporter_kwargs (dict[str, Any]) – Extra keyword args passed through to OTLP exporter.
span_processor_kwargs (dict[str, Any]) – Extra keyword args passed through to BatchSpanProcessor.
- validate()[source]¶
Validate protocol field.
- Raises:
ValueError – If protocol is not supported.
- Return type:
None
- class loom.etl.observability.PipelineRunRecord(event, run_id, correlation_id, attempt, pipeline, started_at, status, duration_ms, error, error_type=None, error_message=None, failed_step_run_id=None, failed_step=None)[source]¶
Bases:
objectSnapshot of a completed pipeline run.
- Parameters:
- class loom.etl.observability.ProcessRunRecord(event, run_id, correlation_id, attempt, process_run_id, process, started_at, status, duration_ms, error, error_type=None, error_message=None, failed_step_run_id=None, failed_step=None)[source]¶
Bases:
objectSnapshot of a completed process run.
- Parameters:
- class loom.etl.observability.RecordField(value)[source]¶
Bases:
StrEnumField names for execution records.
- class loom.etl.observability.RecordFrameTargetWriter(*args, **kwargs)[source]¶
Bases:
ProtocolTarget-writer capability required to persist execution records.
- to_frame(records, /)[source]¶
Convert execution records into backend frame type.
- Parameters:
records (Sequence[PipelineRunRecord | ProcessRunRecord | StepRunRecord])
- Return type:
- class loom.etl.observability.RunContext(run_id, correlation_id=None, attempt=1, last_attempt=True, process_run_id=None)[source]¶
Bases:
objectExecution context shared across all events of a pipeline attempt.
- class loom.etl.observability.RunStatus(value)[source]¶
Bases:
StrEnumTerminal status of a pipeline, process, or step run.
- class loom.etl.observability.StepRunRecord(event, run_id, correlation_id, attempt, step_run_id, step, started_at, status, duration_ms, error, process_run_id=None, error_type=None, error_message=None)[source]¶
Bases:
objectSnapshot of a completed step run.
- Parameters:
- class loom.etl.observability.StructlogRunObserver(slow_step_threshold_ms=None)[source]¶
Bases:
objectObserver that emits structured lifecycle events through structlog.
- Parameters:
slow_step_threshold_ms (int | None) – Optional threshold to emit
slow_stepwarnings.
- class loom.etl.observability.TableExecutionRecordStore(writer, *, database='')[source]¶
Bases:
objectPersist execution records into backend tables.
- Parameters:
writer (ExecutionRecordWriter) – Backend-aware execution record writer.
database (str) – Optional database/schema prefix for table names.
- write_record(record)[source]¶
Append record to the corresponding table.
- Parameters:
record (PipelineRunRecord | ProcessRunRecord | StepRunRecord)
- Return type:
None
- class loom.etl.observability.TargetExecutionRecordWriter(writer)[source]¶
Bases:
objectPersist execution records using target writer
to_frame+append.- Parameters:
writer (RecordFrameTargetWriter) – Backend target writer exposing
to_frameandappend.
- write_record(record, table_ref, /)[source]¶
Append one execution record row into table_ref.
- Parameters:
record (PipelineRunRecord | ProcessRunRecord | StepRunRecord)
table_ref (TableRef)
- Return type:
None
- loom.etl.observability.make_observers(config, *, record_writer=None)[source]¶
Build runtime observers from observability config and optional record writer.
- Parameters:
config (ObservabilityConfig)
record_writer (ExecutionRecordWriter | None)
- Return type: