Source code for loom.etl.checkpoint._store

"""Checkpoint store for ETL pipeline intermediates.

:class:`CheckpointStore` handles the physical read/write of intermediate
results materialised by :class:`~loom.etl.IntoTemp` targets and consumed by
:class:`~loom.etl.FromTemp` sources.

Physical formats
----------------
The backend is selected at construction time and injected via the *backend*
parameter:

* **Polars** :class:`polars.LazyFrame` — Arrow IPC written via ``sink_ipc()``
  (streaming, no in-memory collect) and read back via ``scan_ipc()`` (lazy,
  memory-mapped, predicate pushdown).
  Backend: :class:`~loom.etl.checkpoint._backends._polars._PolarsCheckpointBackend`.

* **Spark** ``pyspark.sql.DataFrame`` — Parquet directory written via
  ``df.write.parquet()`` and read via ``spark.read.parquet()``.  Cuts the
  lineage DAG; Photon-optimised.  Avoids competing with the shuffle memory
  pool that ``df.cache()`` would enter.
  Backend: :class:`~loom.etl.checkpoint._backends._spark._SparkCheckpointBackend`.

Path structure
--------------
RUN scope::

    {root}/runs/{run_id}/{name}.arrow   ← Polars
    {root}/runs/{run_id}/{name}/        ← Spark (Parquet dir)

CORRELATION scope::

    {root}/correlations/{correlation_id}/{name}.arrow
    {root}/correlations/{correlation_id}/{name}/

Cleanup
-------
* :meth:`cleanup_run` — removes the entire ``runs/{run_id}/`` tree.
  Called in the ``finally`` of every pipeline run.

* :meth:`cleanup_correlation` — removes ``correlations/{correlation_id}/``.
  Called by :class:`~loom.etl.ETLRunner` on successful last attempt, or
  manually via :meth:`~loom.etl.ETLRunner.cleanup_correlation`.
"""

from __future__ import annotations

import logging
from typing import Any, Protocol, runtime_checkable

from loom.etl.checkpoint._backends._polars import _PolarsCheckpointBackend
from loom.etl.checkpoint._cleaners import CheckpointCleaner, TempCleaner
from loom.etl.checkpoint._paths import correlation_scope_base, run_scope_base, scope_base
from loom.etl.checkpoint._scope import CheckpointScope

_log = logging.getLogger(__name__)


# ---------------------------------------------------------------------------
# Backend Protocol — domain contract
# ---------------------------------------------------------------------------


@runtime_checkable
class _CheckpointBackend(Protocol):
    """Physical I/O contract for one DataFrame backend.

    Both :class:`~loom.etl.checkpoint._backends._polars._PolarsCheckpointBackend` and
    :class:`~loom.etl.checkpoint._backends._spark._SparkCheckpointBackend` satisfy this
    protocol.  ``CheckpointStore`` holds a single ``_CheckpointBackend`` chosen
    at construction time, keeping ``put`` / ``get`` free of backend branches.
    """

    def probe(self, name: str, base: str) -> Any | None:
        """Return the frame stored at (*name*, *base*), or ``None`` if absent."""
        ...

    def write(self, name: str, base: str, data: Any, *, append: bool) -> None:
        """Materialise *data* at (*name*, *base*)."""
        ...


# ---------------------------------------------------------------------------
# CheckpointStore
# ---------------------------------------------------------------------------


[docs] class CheckpointStore: """Physical store for intermediate ETL results. The backend (Polars or Spark) is injected at construction time via the *backend* parameter. ``put`` and ``get`` are then backend-agnostic — no runtime type switching. Args: root: Root cloud URI where intermediates are stored. backend: Physical I/O backend. Defaults to Polars checkpoint backend. cleaner: Cleaner implementation used to delete checkpoint trees. Example:: store = CheckpointStore( root="s3://my-bucket/loom-checkpoints", backend=_PolarsCheckpointBackend(storage_options={}), ) store.put("orders", run_id="abc", correlation_id=None, scope=CheckpointScope.RUN, data=polars_lazy_frame) lf = store.get("orders", run_id="abc", correlation_id=None) """ def __init__( self, root: str, backend: _CheckpointBackend | None = None, cleaner: TempCleaner | None = None, ) -> None: self._root = root.rstrip("/") self._cleaner = cleaner if cleaner is not None else CheckpointCleaner() self._backend = backend if backend is not None else _PolarsCheckpointBackend({}) # ------------------------------------------------------------------ # Public write / read # ------------------------------------------------------------------
[docs] def put( self, name: str, *, run_id: str, correlation_id: str | None, scope: CheckpointScope, data: Any, append: bool = False, ) -> None: """Materialise *data* as a named intermediate. Args: name: Logical name matching :class:`~loom.etl.IntoTemp`. run_id: UUID of the current pipeline run. correlation_id: Logical job ID grouping retries. Required when *scope* is :attr:`~CheckpointScope.CORRELATION`. scope: Whether to keep across retries. data: ``polars.LazyFrame`` or ``pyspark.sql.DataFrame``. append: When ``True``, writes a new partition file alongside any previously written parts for this name (fan-in). When ``False`` (default), the write is exclusive — any prior content for this name is overwritten. Raises: ValueError: When *scope* is ``CORRELATION`` and *correlation_id* is ``None``. TypeError: When *data* type does not match the configured backend. """ if scope is CheckpointScope.CORRELATION and not correlation_id: raise ValueError( f"IntoTemp({name!r}, scope=CORRELATION) requires a correlation_id. " "Pass correlation_id= to ETLRunner.run()." ) _log.debug("checkpoint put name=%r scope=%s append=%s", name, scope, append) base = scope_base( self._root, run_id=run_id, correlation_id=correlation_id, scope=scope, ) self._backend.write(name, base, data, append=append)
[docs] def get( self, name: str, *, run_id: str, correlation_id: str | None, ) -> Any: """Load a previously materialised intermediate. Tries the RUN-scope path first, then falls back to the CORRELATION-scope path. This allows :class:`~loom.etl.FromTemp` consumers to be scope-agnostic — they only declare the name. Returns a :class:`polars.LazyFrame` for Polars intermediates, or a ``pyspark.sql.DataFrame`` for Spark intermediates. Args: name: Logical name matching :class:`~loom.etl.IntoTemp`. run_id: UUID of the pipeline run. correlation_id: Logical job ID. Checked when RUN path is absent. Raises: FileNotFoundError: When no intermediate exists at either path. """ run_base = run_scope_base(self._root, run_id) result = self._backend.probe(name, run_base) if result is not None: _log.debug("checkpoint get name=%r scope=run", name) return result if correlation_id is not None: corr_base = correlation_scope_base(self._root, correlation_id) result = self._backend.probe(name, corr_base) if result is not None: _log.debug("checkpoint get name=%r scope=correlation", name) return result raise FileNotFoundError( f"Intermediate {name!r} not found. " f"Check that IntoTemp({name!r}) ran before FromTemp({name!r})." )
# ------------------------------------------------------------------ # Cleanup # ------------------------------------------------------------------
[docs] def cleanup_run(self, run_id: str) -> None: """Remove all RUN-scope intermediates for *run_id*. Safe to call even when no intermediates were written. Args: run_id: UUID of the pipeline run to clean. """ path = run_scope_base(self._root, run_id) _log.debug("checkpoint cleanup run path=%s", path) self._cleaner.delete_tree(path)
[docs] def cleanup_correlation(self, correlation_id: str) -> None: """Remove all CORRELATION-scope intermediates for *correlation_id*. Call this after the final successful attempt, or to reclaim space after a permanently failed job. Args: correlation_id: Logical job ID whose intermediates to purge. """ path = correlation_scope_base(self._root, correlation_id) _log.debug("checkpoint cleanup correlation path=%s", path) self._cleaner.delete_tree(path)