loom.etl.checkpoint

Checkpoint storage API for ETL pipeline intermediates.

loom.etl.checkpoint.CheckpointCleaner

alias of FsspecTempCleaner

class loom.etl.checkpoint.CheckpointScope(value)[source]

Bases: StrEnum

Lifetime scope of a checkpoint (intermediate) result.

Declared on IntoTemp to control when the materialised intermediate is cleaned up by the executor.

Values:

  • RUN — Cleaned in the finally block of every pipeline run,

    regardless of success or failure. Safe default for ephemeral checkpoints within a single attempt.

  • CORRELATION — Survives a failed run so that the next retry attempt

    can reuse already-materialised intermediates. Cleaned automatically on success when last_attempt=True in loom.etl.observability.RunContext. On failure the executor emits a structured warning; the caller must invoke cleanup_correlation() explicitly after the final failed attempt.

Example:

target = IntoTemp("normalized_orders", scope=CheckpointScope.CORRELATION)
class loom.etl.checkpoint.CheckpointStore(root, backend=None, cleaner=None)[source]

Bases: object

Physical store for intermediate ETL results.

The backend (Polars or Spark) is injected at construction time via the backend parameter. put and get are then backend-agnostic — no runtime type switching.

Parameters:
  • root (str) – Root cloud URI where intermediates are stored.

  • backend (_CheckpointBackend | None) – Physical I/O backend. Defaults to Polars checkpoint backend.

  • cleaner (TempCleaner | None) – Cleaner implementation used to delete checkpoint trees.

Example:

store = CheckpointStore(
    root="s3://my-bucket/loom-checkpoints",
    backend=_PolarsCheckpointBackend(storage_options={}),
)
store.put("orders", run_id="abc", correlation_id=None,
          scope=CheckpointScope.RUN, data=polars_lazy_frame)
lf = store.get("orders", run_id="abc", correlation_id=None)
put(name, *, run_id, correlation_id, scope, data, append=False)[source]

Materialise data as a named intermediate.

Parameters:
  • name (str) – Logical name matching IntoTemp.

  • run_id (str) – UUID of the current pipeline run.

  • correlation_id (str | None) – Logical job ID grouping retries. Required when scope is CORRELATION.

  • scope (CheckpointScope) – Whether to keep across retries.

  • data (Any) – polars.LazyFrame or pyspark.sql.DataFrame.

  • append (bool) – When True, writes a new partition file alongside any previously written parts for this name (fan-in). When False (default), the write is exclusive — any prior content for this name is overwritten.

Raises:
  • ValueError – When scope is CORRELATION and correlation_id is None.

  • TypeError – When data type does not match the configured backend.

Return type:

None

get(name, *, run_id, correlation_id)[source]

Load a previously materialised intermediate.

Tries the RUN-scope path first, then falls back to the CORRELATION-scope path. This allows FromTemp consumers to be scope-agnostic — they only declare the name.

Returns a polars.LazyFrame for Polars intermediates, or a pyspark.sql.DataFrame for Spark intermediates.

Parameters:
  • name (str) – Logical name matching IntoTemp.

  • run_id (str) – UUID of the pipeline run.

  • correlation_id (str | None) – Logical job ID. Checked when RUN path is absent.

Raises:

FileNotFoundError – When no intermediate exists at either path.

Return type:

Any

cleanup_run(run_id)[source]

Remove all RUN-scope intermediates for run_id.

Safe to call even when no intermediates were written.

Parameters:

run_id (str) – UUID of the pipeline run to clean.

Return type:

None

cleanup_correlation(correlation_id)[source]

Remove all CORRELATION-scope intermediates for correlation_id.

Call this after the final successful attempt, or to reclaim space after a permanently failed job.

Parameters:

correlation_id (str) – Logical job ID whose intermediates to purge.

Return type:

None

class loom.etl.checkpoint.TempCleaner(*args, **kwargs)[source]

Bases: Protocol

Protocol for deleting checkpoint trees.

delete_tree(path)[source]

Delete path recursively.

Parameters:

path (str)

Return type:

None

class loom.etl.checkpoint.FsspecTempCleaner(storage_options=None)[source]

Bases: object

Delete checkpoint trees via fsspec (s3, gs, abfss, etc).

Parameters:

storage_options (Mapping[str, str] | None)

delete_tree(path)[source]

Remove path and its contents from cloud storage.

Failure is non-fatal — a WARNING is logged instead.

Parameters:

path (str)

Return type:

None