loom.etl.observability

Observability public API for ETL runtime hooks and persisted records.

class loom.etl.observability.CompositeObserver(observers)[source]

Bases: object

Fan out lifecycle events to multiple observers safely.

Parameters:

observers (Sequence[ETLRunObserver])

class loom.etl.observability.ETLRunObserver(*args, **kwargs)[source]

Bases: Protocol

Protocol for ETL lifecycle observability callbacks.

on_pipeline_start(plan, _params, ctx)[source]

Called before the first process of the pipeline executes.

Parameters:
Return type:

None

on_pipeline_end(ctx, status, duration_ms)[source]

Called after pipeline completion or first unhandled error.

Parameters:
Return type:

None

on_process_start(plan, ctx, process_run_id)[source]

Called before the first step of a process executes.

Parameters:
Return type:

None

on_process_end(process_run_id, status, duration_ms)[source]

Called after process completion or first unhandled error.

Parameters:
Return type:

None

on_step_start(plan, ctx, step_run_id)[source]

Called before sources are read.

Parameters:
Return type:

None

on_step_end(step_run_id, status, duration_ms)[source]

Called after write completes or on failure.

Parameters:
Return type:

None

on_step_error(step_run_id, exc)[source]

Called before on_step_end when a step fails.

Parameters:
Return type:

None

class loom.etl.observability.EventName(value)[source]

Bases: StrEnum

All lifecycle event names emitted by the ETL executor.

class loom.etl.observability.ExecutionRecordStore(*args, **kwargs)[source]

Bases: Protocol

Protocol for persisting execution records.

write_record(record)[source]

Persist one completed execution record.

Parameters:

record (PipelineRunRecord | ProcessRunRecord | StepRunRecord)

Return type:

None

class loom.etl.observability.ExecutionRecordWriter(*args, **kwargs)[source]

Bases: Protocol

Persist one execution record into a table destination.

write_record(record, table_ref, /)[source]

Write record to table_ref.

Parameters:
Return type:

None

class loom.etl.observability.ExecutionRecordStoreConfig(root='', database='', storage_options=<factory>, writer=<factory>, delta_config=<factory>, commit=<factory>)[source]

Bases: Struct

Configuration for persisted execution records.

Parameters:
  • root (str) – Path/URI destination for path-based table mode.

  • database (str) – Database/schema destination for catalog mode.

  • storage_options (dict[str, str]) – Cloud credentials for path mode.

  • writer (dict[str, Any]) – Writer options for path mode.

  • delta_config (dict[str, str | None]) – Delta table properties for path mode.

  • commit (dict[str, Any]) – Commit metadata for path mode.

validate()[source]

Validate that exactly one destination mode is configured.

Raises:

ValueError – If both or neither of root and database are set.

Return type:

None

class loom.etl.observability.ExecutionRecordsObserver(store)[source]

Bases: object

Convert lifecycle callbacks into persisted execution records.

Parameters:

store (ExecutionRecordStore) – Persistence backend implementing ExecutionRecordStore.

loom.etl.observability.get_record_table_name(record_type)[source]

Get table name for a given record type.

Parameters:

record_type (type[PipelineRunRecord | ProcessRunRecord | StepRunRecord])

Return type:

str

class loom.etl.observability.NoopRunObserver[source]

Bases: object

Observer implementation that intentionally does nothing.

on_pipeline_start(_plan, _params, _ctx)[source]

No-op hook.

Parameters:
Return type:

None

on_pipeline_end(_ctx, _status, _duration_ms)[source]

No-op hook.

Parameters:
Return type:

None

on_process_start(_plan, _ctx, _process_run_id)[source]

No-op hook.

Parameters:
Return type:

None

on_process_end(_process_run_id, _status, _duration_ms)[source]

No-op hook.

Parameters:
Return type:

None

on_step_start(_plan, _ctx, _step_run_id)[source]

No-op hook.

Parameters:
Return type:

None

on_step_end(_step_run_id, _status, _duration_ms)[source]

No-op hook.

Parameters:
Return type:

None

on_step_error(_step_run_id, _exc)[source]

No-op hook.

Parameters:
Return type:

None

class loom.etl.observability.ObservabilityConfig(log=True, otel=False, otel_config=None, record_store=None, slow_step_threshold_ms=None)[source]

Bases: Struct

Observability config loaded from the observability YAML section.

Parameters:
  • log (bool) – Enables structured runtime logs via StructlogRunObserver.

  • otel (bool) – Enables OpenTelemetry tracing via OtelRunObserver. Requires the etl-otel extra.

  • otel_config (OtelConfig | None) – Optional OTel SDK/exporter config. When set, OTel tracing is enabled even if otel=False.

  • record_store (ExecutionRecordStoreConfig | None) – Enables persisted execution records via ExecutionRecordsObserver.

  • slow_step_threshold_ms (int | None) – Optional slow-step warning threshold.

class loom.etl.observability.OtelConfig(service_name='loom-etl', tracer_name='loom.etl', tracer_version='', protocol='http/protobuf', endpoint='', insecure=True, headers=<factory>, resource_attributes=<factory>, span_attributes=<factory>, exporter_kwargs=<factory>, span_processor_kwargs=<factory>)[source]

Bases: Struct

OpenTelemetry SDK/exporter configuration.

Parameters:
  • service_name (str) – Resource attribute service.name.

  • tracer_name (str) – Tracer instrumentation name.

  • tracer_version (str) – Optional tracer instrumentation version.

  • protocol (str) – OTLP protocol (http/protobuf or grpc).

  • endpoint (str) – OTLP endpoint URI. When empty, uses global OTel runtime defaults.

  • insecure (bool) – Exporter transport mode when supported by protocol/exporter.

  • headers (dict[str, str]) – Exporter request headers (vendor auth/tags).

  • resource_attributes (dict[str, str]) – Additional OTel resource attributes.

  • span_attributes (dict[str, str]) – Static span attributes added to all ETL spans.

  • exporter_kwargs (dict[str, Any]) – Extra keyword args passed through to OTLP exporter.

  • span_processor_kwargs (dict[str, Any]) – Extra keyword args passed through to BatchSpanProcessor.

validate()[source]

Validate protocol field.

Raises:

ValueError – If protocol is not supported.

Return type:

None

class loom.etl.observability.PipelineRunRecord(event, run_id, correlation_id, attempt, pipeline, started_at, status, duration_ms, error, error_type=None, error_message=None, failed_step_run_id=None, failed_step=None)[source]

Bases: object

Snapshot of a completed pipeline run.

Parameters:
to_row()[source]

Convert record into a storage row mapping.

Return type:

dict[str, Any]

class loom.etl.observability.ProcessRunRecord(event, run_id, correlation_id, attempt, process_run_id, process, started_at, status, duration_ms, error, error_type=None, error_message=None, failed_step_run_id=None, failed_step=None)[source]

Bases: object

Snapshot of a completed process run.

Parameters:
  • event (EventName)

  • run_id (str)

  • correlation_id (str | None)

  • attempt (int)

  • process_run_id (str)

  • process (str)

  • started_at (datetime)

  • status (RunStatus)

  • duration_ms (int)

  • error (str | None)

  • error_type (str | None)

  • error_message (str | None)

  • failed_step_run_id (str | None)

  • failed_step (str | None)

to_row()[source]

Convert record into a storage row mapping.

Return type:

dict[str, Any]

class loom.etl.observability.RecordField(value)[source]

Bases: StrEnum

Field names for execution records.

class loom.etl.observability.RecordFrameTargetWriter(*args, **kwargs)[source]

Bases: Protocol

Target-writer capability required to persist execution records.

to_frame(records, /)[source]

Convert execution records into backend frame type.

Parameters:

records (Sequence[PipelineRunRecord | ProcessRunRecord | StepRunRecord])

Return type:

Any

append(frame, table_ref, params_instance, /, *, streaming=False)[source]

Append backend frame into destination table.

Parameters:
Return type:

None

class loom.etl.observability.RunContext(run_id, correlation_id=None, attempt=1, last_attempt=True, process_run_id=None)[source]

Bases: object

Execution context shared across all events of a pipeline attempt.

Parameters:
  • run_id (str)

  • correlation_id (str | None)

  • attempt (int)

  • last_attempt (bool)

  • process_run_id (str | None)

class loom.etl.observability.RunStatus(value)[source]

Bases: StrEnum

Terminal status of a pipeline, process, or step run.

class loom.etl.observability.StepRunRecord(event, run_id, correlation_id, attempt, step_run_id, step, started_at, status, duration_ms, error, process_run_id=None, error_type=None, error_message=None)[source]

Bases: object

Snapshot of a completed step run.

Parameters:
to_row()[source]

Convert record into a storage row mapping.

Return type:

dict[str, Any]

class loom.etl.observability.StructlogRunObserver(slow_step_threshold_ms=None)[source]

Bases: object

Observer that emits structured lifecycle events through structlog.

Parameters:

slow_step_threshold_ms (int | None) – Optional threshold to emit slow_step warnings.

class loom.etl.observability.TableExecutionRecordStore(writer, *, database='')[source]

Bases: object

Persist execution records into backend tables.

Parameters:
  • writer (ExecutionRecordWriter) – Backend-aware execution record writer.

  • database (str) – Optional database/schema prefix for table names.

write_record(record)[source]

Append record to the corresponding table.

Parameters:

record (PipelineRunRecord | ProcessRunRecord | StepRunRecord)

Return type:

None

class loom.etl.observability.TargetExecutionRecordWriter(writer)[source]

Bases: object

Persist execution records using target writer to_frame + append.

Parameters:

writer (RecordFrameTargetWriter) – Backend target writer exposing to_frame and append.

write_record(record, table_ref, /)[source]

Append one execution record row into table_ref.

Parameters:
Return type:

None

loom.etl.observability.make_observers(config, *, record_writer=None)[source]

Build runtime observers from observability config and optional record writer.

Parameters:
Return type:

list[ETLRunObserver]