Source code for loom.etl.observability.sinks._table
"""Table-based execution record store using backend writer adapters."""
from __future__ import annotations
from loom.etl.declarative.expr._refs import TableRef
from loom.etl.observability.records import (
ExecutionRecord,
get_record_table_name,
)
from loom.etl.observability.sinks._protocol import ExecutionRecordWriter
[docs]
class TableExecutionRecordStore:
"""Persist execution records into backend tables.
Args:
writer: Backend-aware execution record writer.
database: Optional database/schema prefix for table names.
"""
def __init__(self, writer: ExecutionRecordWriter, *, database: str = "") -> None:
self._writer = writer
self._database = database.strip()
[docs]
def write_record(self, record: ExecutionRecord) -> None:
"""Append *record* to the corresponding table."""
try:
table_name = get_record_table_name(type(record))
except KeyError as exc:
raise TypeError(
f"TableExecutionRecordStore: unrecognised record type {type(record)!r}"
) from exc
self._writer.write_record(record, _table_ref(self._database, table_name))
def _table_ref(database: str, table_name: str) -> TableRef:
if database:
return TableRef(f"{database}.{table_name}")
return TableRef(table_name)
__all__ = ["TableExecutionRecordStore"]