Source code for loom.etl.storage.routing

"""Routing primitives for logical table references.

This module centralizes:

1. resolved target models (catalog/path),
2. route resolver implementations,
3. resolver factory from ``StorageConfig``,
4. route-aware catalog adapter used at compile-time validation.
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import Protocol, runtime_checkable

from loom.etl.declarative.expr._refs import TableRef
from loom.etl.runtime.contracts import TableDiscovery
from loom.etl.schema._schema import ColumnSchema
from loom.etl.storage._config import StorageConfig
from loom.etl.storage._locator import PrefixLocator, TableLocation, TableLocator


[docs] @dataclass(frozen=True) class CatalogTarget: """Resolved catalog destination for one logical table.""" logical_ref: TableRef catalog_ref: TableRef
[docs] @dataclass(frozen=True) class PathTarget: """Resolved physical destination for one logical table.""" logical_ref: TableRef location: TableLocation
ResolvedTarget = CatalogTarget | PathTarget
[docs] @runtime_checkable class TableRouteResolver(Protocol): """Resolve one logical table reference to a runtime target."""
[docs] def resolve(self, logical_ref: TableRef) -> ResolvedTarget: """Resolve *logical_ref* to catalog or path destination.""" ...
[docs] class CatalogRouteResolver: """Resolve all tables as catalog references.""" def __init__(self, default_catalog: str = "") -> None: self._default_catalog = default_catalog.strip() def resolve(self, logical_ref: TableRef) -> CatalogTarget: return CatalogTarget( logical_ref=logical_ref, catalog_ref=logical_ref.qualify(self._default_catalog), )
[docs] class PathRouteResolver: """Resolve all tables through one table locator.""" def __init__(self, locator: TableLocator) -> None: self._locator = locator def resolve(self, logical_ref: TableRef) -> PathTarget: return PathTarget(logical_ref=logical_ref, location=self._locator.locate(logical_ref))
[docs] class FixedCatalogRouteResolver: """Resolve one logical table to one explicit catalog table.""" def __init__(self, catalog_ref: TableRef) -> None: self._catalog_ref = catalog_ref def resolve(self, logical_ref: TableRef) -> CatalogTarget: return CatalogTarget(logical_ref=logical_ref, catalog_ref=self._catalog_ref)
[docs] class FixedPathRouteResolver: """Resolve one logical table to one explicit physical location.""" def __init__(self, location: TableLocation) -> None: self._location = location def resolve(self, logical_ref: TableRef) -> PathTarget: return PathTarget(logical_ref=logical_ref, location=self._location)
[docs] class CompositeRouteResolver: """Resolve using table-specific overrides over one default resolver.""" def __init__( self, *, default: TableRouteResolver, overrides: dict[str, TableRouteResolver] | None = None, ) -> None: self._default = default self._overrides = overrides or {} def resolve(self, logical_ref: TableRef) -> ResolvedTarget: resolver = self._overrides.get(logical_ref.ref, self._default) return resolver.resolve(logical_ref)
@dataclass(frozen=True) class _MissingRouteResolver: """Resolver that always fails for unconfigured logical refs.""" def resolve(self, logical_ref: TableRef) -> ResolvedTarget: raise KeyError( f"No storage route configured for logical table {logical_ref.ref!r}. " "Define storage.defaults.table_path or add an explicit storage.tables entry." )
[docs] def build_table_resolver(config: StorageConfig) -> TableRouteResolver: """Build route resolver from ``StorageConfig`` defaults + table overrides.""" default_resolver = _default_resolver(config) overrides: dict[str, TableRouteResolver] = {} for route in config.tables: if route.path is not None: overrides[route.name] = FixedPathRouteResolver(route.path.to_location()) elif route.ref.strip(): catalog_ref = _qualify_route_ref(route.ref, route.catalog) overrides[route.name] = FixedCatalogRouteResolver(TableRef(catalog_ref)) if not overrides: return default_resolver return CompositeRouteResolver(default=default_resolver, overrides=overrides)
[docs] class RoutedCatalog: """Dispatch ``TableDiscovery`` calls by table route (catalog vs path).""" def __init__( self, resolver: TableRouteResolver, *, catalog: TableDiscovery, path: TableDiscovery | None = None, ) -> None: self._resolver = resolver self._catalog = catalog self._path = path def exists(self, ref: TableRef) -> bool: target = self._resolver.resolve(ref) if isinstance(target, CatalogTarget): return self._catalog.exists(target.catalog_ref) self._require_path_catalog(ref) return self._path.exists(ref) # type: ignore[union-attr] def columns(self, ref: TableRef) -> tuple[str, ...]: target = self._resolver.resolve(ref) if isinstance(target, CatalogTarget): return self._catalog.columns(target.catalog_ref) self._require_path_catalog(ref) return self._path.columns(ref) # type: ignore[union-attr] def schema(self, ref: TableRef) -> tuple[ColumnSchema, ...] | None: target = self._resolver.resolve(ref) if isinstance(target, CatalogTarget): return self._catalog.schema(target.catalog_ref) self._require_path_catalog(ref) return self._path.schema(ref) # type: ignore[union-attr] def update_schema(self, ref: TableRef, schema: tuple[ColumnSchema, ...]) -> None: target = self._resolver.resolve(ref) if isinstance(target, CatalogTarget): self._catalog.update_schema(target.catalog_ref, schema) return self._require_path_catalog(ref) self._path.update_schema(ref, schema) # type: ignore[union-attr] def _require_path_catalog(self, ref: TableRef) -> None: if self._path is None: raise RuntimeError( f"Table {ref.ref!r} resolved to path mode but no path catalog is configured." )
def _default_resolver(config: StorageConfig) -> TableRouteResolver: default_path = config.defaults.table_path if default_path is not None: return PathRouteResolver( PrefixLocator( root=default_path.uri, storage_options=default_path.storage_options or None, writer=default_path.writer or None, delta_config=default_path.delta_config or None, commit=default_path.commit or None, ) ) if config.engine == "spark": return CatalogRouteResolver(default_catalog=_default_catalog_key(config)) return _MissingRouteResolver() def _default_catalog_key(config: StorageConfig) -> str: return "default" if "default" in config.catalogs else "" def _qualify_route_ref(ref: str, catalog_key: str) -> str: if not catalog_key: return ref parts = ref.split(".") if len(parts) == 2: return f"{catalog_key}.{ref}" return ref __all__ = [ "CatalogTarget", "PathTarget", "ResolvedTarget", "TableRouteResolver", "CatalogRouteResolver", "PathRouteResolver", "FixedCatalogRouteResolver", "FixedPathRouteResolver", "CompositeRouteResolver", "build_table_resolver", "RoutedCatalog", ]