loom.etl.checkpoint¶
Checkpoint storage API for ETL pipeline intermediates.
- loom.etl.checkpoint.CheckpointCleaner¶
alias of
FsspecTempCleaner
- class loom.etl.checkpoint.CheckpointScope(value)[source]¶
Bases:
StrEnumLifetime scope of a checkpoint (intermediate) result.
Declared on
IntoTempto control when the materialised intermediate is cleaned up by the executor.Values:
RUN— Cleaned in thefinallyblock of every pipeline run,regardless of success or failure. Safe default for ephemeral checkpoints within a single attempt.
CORRELATION— Survives a failed run so that the next retry attemptcan reuse already-materialised intermediates. Cleaned automatically on success when
last_attempt=Trueinloom.etl.observability.RunContext. On failure the executor emits a structured warning; the caller must invokecleanup_correlation()explicitly after the final failed attempt.
Example:
target = IntoTemp("normalized_orders", scope=CheckpointScope.CORRELATION)
- class loom.etl.checkpoint.CheckpointStore(root, backend=None, cleaner=None)[source]¶
Bases:
objectPhysical store for intermediate ETL results.
The backend (Polars or Spark) is injected at construction time via the backend parameter.
putandgetare then backend-agnostic — no runtime type switching.- Parameters:
root (str) – Root cloud URI where intermediates are stored.
backend (_CheckpointBackend | None) – Physical I/O backend. Defaults to Polars checkpoint backend.
cleaner (TempCleaner | None) – Cleaner implementation used to delete checkpoint trees.
Example:
store = CheckpointStore( root="s3://my-bucket/loom-checkpoints", backend=_PolarsCheckpointBackend(storage_options={}), ) store.put("orders", run_id="abc", correlation_id=None, scope=CheckpointScope.RUN, data=polars_lazy_frame) lf = store.get("orders", run_id="abc", correlation_id=None)
- put(name, *, run_id, correlation_id, scope, data, append=False)[source]¶
Materialise data as a named intermediate.
- Parameters:
name (str) – Logical name matching
IntoTemp.run_id (str) – UUID of the current pipeline run.
correlation_id (str | None) – Logical job ID grouping retries. Required when scope is
CORRELATION.scope (CheckpointScope) – Whether to keep across retries.
data (Any) –
polars.LazyFrameorpyspark.sql.DataFrame.append (bool) – When
True, writes a new partition file alongside any previously written parts for this name (fan-in). WhenFalse(default), the write is exclusive — any prior content for this name is overwritten.
- Raises:
ValueError – When scope is
CORRELATIONand correlation_id isNone.TypeError – When data type does not match the configured backend.
- Return type:
None
- get(name, *, run_id, correlation_id)[source]¶
Load a previously materialised intermediate.
Tries the RUN-scope path first, then falls back to the CORRELATION-scope path. This allows
FromTempconsumers to be scope-agnostic — they only declare the name.Returns a
polars.LazyFramefor Polars intermediates, or apyspark.sql.DataFramefor Spark intermediates.- Parameters:
- Raises:
FileNotFoundError – When no intermediate exists at either path.
- Return type:
- cleanup_run(run_id)[source]¶
Remove all RUN-scope intermediates for run_id.
Safe to call even when no intermediates were written.
- Parameters:
run_id (str) – UUID of the pipeline run to clean.
- Return type:
None
- cleanup_correlation(correlation_id)[source]¶
Remove all CORRELATION-scope intermediates for correlation_id.
Call this after the final successful attempt, or to reclaim space after a permanently failed job.
- Parameters:
correlation_id (str) – Logical job ID whose intermediates to purge.
- Return type:
None
- class loom.etl.checkpoint.TempCleaner(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for deleting checkpoint trees.