Source code for loom.etl.checkpoint._cleaners
"""Checkpoint cleaners."""
from __future__ import annotations
import logging
from collections.abc import Mapping
from typing import Protocol, runtime_checkable
import fsspec.core
_log = logging.getLogger(__name__)
_CLOUD_SCHEMES = frozenset({"s3", "gs", "gcs", "abfss", "abfs", "az", "r2"})
def _is_cloud_path(path: str) -> bool:
"""Return ``True`` when *path* uses a supported cloud URI scheme."""
if "://" not in path:
return False
scheme, _ = path.split("://", 1)
return scheme.lower() in _CLOUD_SCHEMES
[docs]
@runtime_checkable
class TempCleaner(Protocol):
"""Protocol for deleting checkpoint trees."""
[docs]
def delete_tree(self, path: str) -> None:
"""Delete *path* recursively."""
...
[docs]
class FsspecTempCleaner:
"""Delete checkpoint trees via fsspec (s3, gs, abfss, etc)."""
def __init__(self, storage_options: Mapping[str, str] | None = None) -> None:
self._storage_options = dict(storage_options or {})
[docs]
def delete_tree(self, path: str) -> None:
"""Remove *path* and its contents from cloud storage.
Failure is non-fatal — a ``WARNING`` is logged instead.
"""
try:
fs, fpath = fsspec.core.url_to_fs(path, **self._storage_options)
if fs.exists(fpath):
_log.debug("checkpoint cleanup path=%s", path)
fs.rm(fpath, recursive=True)
except Exception as exc:
_log.warning("checkpoint cleanup skipped path=%s reason=%s", path, exc)
# Backward-compatible alias for public imports.
CheckpointCleaner = FsspecTempCleaner
def _join_path(base: str, *parts: str) -> str:
"""Join *base* and *parts* into a single path, preserving cloud URI schemes."""
root = base.rstrip("/")
suffix = "/".join(part.strip("/") for part in parts if part)
if not root:
return f"/{suffix}" if suffix else "/"
return f"{root}/{suffix}" if suffix else root