Source code for loom.etl.storage._config

"""Unified ETL storage configuration.

Defines one canonical ``storage:`` model for both Spark and Polars runtimes.
Logical table names remain stable in pipelines while this config resolves each
name to either a catalog reference or a physical Delta path.
"""

from __future__ import annotations

from collections.abc import Mapping
from enum import StrEnum
from typing import TYPE_CHECKING, Any, Literal
from urllib.parse import urlparse

if TYPE_CHECKING:
    from loom.etl.storage._file_locator import MappingFileLocator

import msgspec
from deltalake import CommitProperties, WriterProperties

from loom.etl.storage._locator import MappingLocator, PrefixLocator, TableLocation, TableLocator


[docs] class StorageEngine(StrEnum): """Execution engine guardrail declared in the storage config.""" POLARS = "polars" SPARK = "spark"
[docs] class MissingTablePolicy(StrEnum): """Policy used when a TABLE target does not exist at write time.""" SCHEMA_MODE = "schema_mode" CREATE = "create"
[docs] class CatalogConnection(msgspec.Struct, frozen=True): """Catalog connection settings. Args: provider: Catalog provider. Currently ``"unity"``. workspace: Databricks workspace URL for Unity Catalog. token: Databricks access token. """ provider: Literal["unity"] = "unity" workspace: str = "" token: str = ""
[docs] class TablePathConfig(msgspec.Struct, frozen=True): """Physical Delta path settings for table storage. Args: uri: Root Delta path/URI. storage_options: Object-store credentials/options. writer: Delta writer properties. delta_config: Delta table properties. commit: Delta commit metadata. """ uri: str = "" storage_options: dict[str, str] = {} writer: dict[str, Any] = {} delta_config: dict[str, str | None] = {} commit: dict[str, Any] = {}
[docs] def validate(self, *, context: str) -> None: """Validate path config and delta-rs option dictionaries.""" if not self.uri.strip(): raise ValueError(f"storage.{context}.uri must be a non-empty string") _validate_writer(self.writer, context=f"{context}.writer") _validate_commit(self.commit, context=f"{context}.commit")
[docs] def to_location(self) -> TableLocation: """Convert to :class:`TableLocation`.""" return TableLocation( uri=self.uri, storage_options=self.storage_options, writer=self.writer, delta_config=self.delta_config, commit=self.commit, )
[docs] class FilePathConfig(msgspec.Struct, frozen=True): """Physical path settings for FILE sources/targets. Args: uri: Output/input file URI template. storage_options: Object-store credentials/options. """ uri: str = "" storage_options: dict[str, str] = {}
[docs] def validate(self, *, context: str) -> None: """Validate file path config.""" if not self.uri.strip(): raise ValueError(f"storage.{context}.uri must be a non-empty string")
[docs] class StorageDefaults(msgspec.Struct, frozen=True): """Default resolution settings used when no per-name override is declared.""" table_path: TablePathConfig | None = None
[docs] class TableRoute(msgspec.Struct, frozen=True): """Route one logical table name to catalog ref or physical path. Args: name: Logical table name used by the pipeline (e.g. ``"sys.customers"``). ref: Catalog reference. Supported forms: * ``"catalog.schema.table"`` * ``"schema.table"`` (uses ``catalog`` or ``default``) catalog: Catalog connection key used with 2-part refs. path: Physical Delta path route. """ name: str ref: str = "" catalog: str = "" path: TablePathConfig | None = None
[docs] def validate(self, *, catalogs: dict[str, CatalogConnection], context: str) -> None: """Validate route syntax and exclusivity constraints.""" if not self.name.strip(): raise ValueError(f"storage.{context}.name must be a non-empty string") has_ref = bool(self.ref.strip()) has_path = self.path is not None if has_ref == has_path: raise ValueError( f"storage.{context} must define exactly one destination: 'ref' or 'path'" ) if has_ref: _validate_ref(self.ref, context=f"storage.{context}.ref") if self.catalog and self.catalog not in catalogs: raise ValueError( f"storage.{context}.catalog={self.catalog!r} is not defined in storage.catalogs" ) if self.path is not None: self.path.validate(context=f"{context}.path")
[docs] class FileRoute(msgspec.Struct, frozen=True): """Route one logical file name to a physical file URI. Args: name: Logical file name used by the pipeline. path: Physical path configuration. """ name: str path: FilePathConfig
[docs] def validate(self, *, context: str) -> None: """Validate file route fields.""" if not self.name.strip(): raise ValueError(f"storage.{context}.name must be a non-empty string") self.path.validate(context=f"{context}.path")
[docs] class StorageConfig(msgspec.Struct, frozen=True): """Canonical storage configuration used by ETL runner/factory. Args: engine: Engine guardrail. missing_table_policy: First-run policy for missing destination tables. catalogs: Catalog connection map. defaults: Default path settings. tables: Per-logical-table routes. files: Per-logical-file routes. tmp_root: Root URI for intermediate storage. tmp_storage_options: Credentials/options for intermediate storage. """ engine: Literal["polars", "spark"] = "polars" missing_table_policy: MissingTablePolicy = MissingTablePolicy.SCHEMA_MODE catalogs: dict[str, CatalogConnection] = {} defaults: StorageDefaults = StorageDefaults() tables: tuple[TableRoute, ...] = () files: tuple[FileRoute, ...] = () tmp_root: str = "" tmp_storage_options: dict[str, str] = {}
[docs] def validate(self) -> None: """Validate structural constraints and option dictionaries.""" _ensure_unique_names( (route.name for route in self.tables), context="storage.tables", ) _ensure_unique_names( (route.name for route in self.files), context="storage.files", ) if self.defaults.table_path is not None: self.defaults.table_path.validate(context="defaults.table_path") for idx, table_route in enumerate(self.tables): table_route.validate(catalogs=self.catalogs, context=f"tables[{idx}]") for idx, file_route in enumerate(self.files): file_route.validate(context=f"files[{idx}]") if self.tmp_root and not _is_cloud_uri(self.tmp_root): raise ValueError( "storage.tmp_root must be a cloud URI (s3://, gs://, abfss://, ...). " "Local checkpoint roots are not supported." ) self._validate_polars_uc_credentials()
@property def checkpoint_root(self) -> str: """Checkpoint root URI (canonical alias of ``tmp_root``).""" return self.tmp_root @property def checkpoint_storage_options(self) -> dict[str, str]: """Checkpoint storage options (canonical alias of ``tmp_storage_options``).""" return self.tmp_storage_options
[docs] def has_catalog_routes(self) -> bool: """Return ``True`` when at least one table route uses ``ref``.""" return any(bool(route.ref.strip()) for route in self.tables)
[docs] def to_file_locator(self) -> MappingFileLocator | None: """Build a :class:`~loom.etl.storage._file_locator.MappingFileLocator` from the declared ``storage.files`` routes, or ``None`` when no file routes are configured. Returns: Locator mapping each ``files[].name`` to its physical :class:`~loom.etl.storage._file_locator.FileLocation`, or ``None`` when ``storage.files`` is empty. """ from loom.etl.storage._file_locator import FileLocation, MappingFileLocator if not self.files: return None mapping = { route.name: FileLocation( uri_template=route.path.uri, storage_options=route.path.storage_options, ) for route in self.files } return MappingFileLocator(mapping)
[docs] def has_path_routes(self) -> bool: """Return ``True`` when defaults or per-table routes use path mode.""" if self.defaults.table_path is not None: return True return any(route.path is not None for route in self.tables)
[docs] def to_path_locator(self) -> TableLocator: """Build a :class:`TableLocator` from path defaults and per-table path routes. Raises: ValueError: If no path defaults/routes are configured. """ mapping = { route.name: route.path.to_location() for route in self.tables if route.path is not None } default_location = ( self.defaults.table_path.to_location() if self.defaults.table_path is not None else None ) if not mapping and default_location is None: raise ValueError( "storage.to_path_locator: no path routes configured. " "Define storage.defaults.table_path or storage.tables[].path." ) if not mapping and default_location is not None: return PrefixLocator( root=default_location.uri, storage_options=default_location.storage_options or None, writer=default_location.writer or None, delta_config=default_location.delta_config or None, commit=default_location.commit or None, ) return MappingLocator(mapping=mapping, default=default_location)
def _validate_polars_uc_credentials(self) -> None: if self.engine != StorageEngine.POLARS.value: return for idx, table_route in enumerate(self.tables): if not table_route.ref.strip(): continue catalog_key = _resolve_polars_catalog_key(table_route, self.catalogs) if not catalog_key: raise ValueError( "Polars UC routes require credentials in storage.catalogs. " f"Missing mapping for storage.tables[{idx}] (ref={table_route.ref!r}). " "Set tables[].catalog or define matching storage.catalogs entry." ) connection = self.catalogs[catalog_key] if not connection.workspace.strip() or not connection.token.strip(): raise ValueError( f"storage.catalogs[{catalog_key!r}] must define non-empty workspace and token " "for Polars UC routes." )
def convert_storage_config(raw: dict[str, Any]) -> StorageConfig: """Convert a resolved plain dict into :class:`StorageConfig`.""" return msgspec.convert(raw, StorageConfig) def _validate_ref(ref: str, *, context: str) -> None: parts = tuple(part for part in ref.split(".") if part) if len(parts) not in (2, 3): raise ValueError(f"{context} must be 'schema.table' or 'catalog.schema.table', got {ref!r}") def _ensure_unique_names(names: Any, *, context: str) -> None: seen: set[str] = set() for name in names: if name in seen: raise ValueError(f"{context} contains duplicate name {name!r}") seen.add(name) def _resolve_polars_catalog_key( route: TableRoute, catalogs: Mapping[str, CatalogConnection], ) -> str: explicit_key = route.catalog.strip() if explicit_key: return explicit_key parts = tuple(part for part in route.ref.split(".") if part) if len(parts) == 2: return "default" if "default" in catalogs else "" if len(parts) == 3: catalog_from_ref = parts[0] return catalog_from_ref if catalog_from_ref in catalogs else "" return "" def _is_cloud_uri(path: str) -> bool: cloud_schemes = frozenset({"s3", "gs", "gcs", "abfss", "abfs", "az", "r2"}) parsed = urlparse(path.strip()) if parsed.scheme.lower() not in cloud_schemes: return False # Cloud roots must include authority/bucket/container to avoid accepting # incomplete URIs like "s3://". return bool(parsed.netloc) # --------------------------------------------------------------------------- # Internal validators — construct delta-rs objects to fail fast at startup # --------------------------------------------------------------------------- def _validate_writer(writer: dict[str, Any], *, context: str) -> None: if not writer: return try: WriterProperties(**writer) except TypeError as exc: raise TypeError(f"Invalid key in storage.{context}: {exc}") from exc def _validate_commit(commit: dict[str, Any], *, context: str) -> None: if not commit: return try: CommitProperties(**commit) except TypeError as exc: raise TypeError(f"Invalid key in storage.{context}: {exc}") from exc