loom.etl.storage

Storage config, location mapping, and routing.

class loom.etl.storage.FileLocation(uri_template, storage_options=<factory>)[source]

Bases: object

Physical storage address for one file route.

Parameters:
  • uri_template (str) – Full URI or URI template. Supports {field_name} placeholders consistent with FromFile path templates.

  • storage_options (dict[str, str]) – Cloud credentials / connection settings passed verbatim to the underlying I/O layer.

class loom.etl.storage.FileLocator(*args, **kwargs)[source]

Bases: Protocol

Protocol for resolving a logical file alias to a physical FileLocation.

Implement this to support custom file routing strategies.

Example:

class MyFileLocator:
    def locate(self, name: str) -> FileLocation:
        return FileLocation(uri_template=f"s3://my-bucket/{name}/")
locate(name)[source]

Resolve name to its physical storage location.

Parameters:

name (str) – Logical file alias declared via alias() or alias().

Returns:

FileLocation with full URI template and credentials.

Raises:

KeyError – When name is not registered.

Return type:

FileLocation

class loom.etl.storage.MappingFileLocator(mapping)[source]

Bases: object

Resolve file aliases via an explicit alias FileLocation mapping.

Built automatically by to_file_locator() from the storage.files configuration block.

Parameters:

mapping (dict[str, FileLocation]) – alias FileLocation dict.

Raises:

KeyError – On locate() when the alias is not in mapping.

Example:

locator = MappingFileLocator(
    mapping={
        "events_raw": FileLocation(
            uri_template="s3://raw/events/",
            storage_options={"AWS_REGION": "eu-west-1"},
        ),
        "exports_daily": FileLocation(
            uri_template="s3://exports/daily/",
        ),
    }
)
locate(name)[source]

Resolve name from the mapping.

Parameters:

name (str) – Logical file alias.

Returns:

FileLocation for the alias.

Raises:

KeyError – When name is not registered. The error message lists available aliases to aid debugging.

Return type:

FileLocation

class loom.etl.storage.StorageEngine(value)[source]

Bases: StrEnum

Execution engine guardrail declared in the storage config.

class loom.etl.storage.StorageConfig(engine='polars', missing_table_policy=MissingTablePolicy.SCHEMA_MODE, catalogs=<factory>, defaults=StorageDefaults(table_path=None), tables=(), files=(), tmp_root='', tmp_storage_options=<factory>)[source]

Bases: Struct

Canonical storage configuration used by ETL runner/factory.

Parameters:
  • engine (Literal['polars', 'spark']) – Engine guardrail.

  • missing_table_policy (MissingTablePolicy) – First-run policy for missing destination tables.

  • catalogs (dict[str, CatalogConnection]) – Catalog connection map.

  • defaults (StorageDefaults) – Default path settings.

  • tables (tuple[TableRoute, ...]) – Per-logical-table routes.

  • files (tuple[FileRoute, ...]) – Per-logical-file routes.

  • tmp_root (str) – Root URI for intermediate storage.

  • tmp_storage_options (dict[str, str]) – Credentials/options for intermediate storage.

validate()[source]

Validate structural constraints and option dictionaries.

Return type:

None

property checkpoint_root: str

Checkpoint root URI (canonical alias of tmp_root).

property checkpoint_storage_options: dict[str, str]

Checkpoint storage options (canonical alias of tmp_storage_options).

has_catalog_routes()[source]

Return True when at least one table route uses ref.

Return type:

bool

to_file_locator()[source]

Build a MappingFileLocator from the declared storage.files routes, or None when no file routes are configured.

Returns:

Locator mapping each files[].name to its physical FileLocation, or None when storage.files is empty.

Return type:

MappingFileLocator | None

has_path_routes()[source]

Return True when defaults or per-table routes use path mode.

Return type:

bool

to_path_locator()[source]

Build a TableLocator from path defaults and per-table path routes.

Raises:

ValueError – If no path defaults/routes are configured.

Return type:

TableLocator

class loom.etl.storage.StorageDefaults(table_path=None)[source]

Bases: Struct

Default resolution settings used when no per-name override is declared.

Parameters:

table_path (TablePathConfig | None)

class loom.etl.storage.CatalogConnection(provider='unity', workspace='', token='')[source]

Bases: Struct

Catalog connection settings.

Parameters:
  • provider (Literal['unity']) – Catalog provider. Currently "unity".

  • workspace (str) – Databricks workspace URL for Unity Catalog.

  • token (str) – Databricks access token.

class loom.etl.storage.TablePathConfig(uri='', storage_options=<factory>, writer=<factory>, delta_config=<factory>, commit=<factory>)[source]

Bases: Struct

Physical Delta path settings for table storage.

Parameters:
  • uri (str) – Root Delta path/URI.

  • storage_options (dict[str, str]) – Object-store credentials/options.

  • writer (dict[str, Any]) – Delta writer properties.

  • delta_config (dict[str, str | None]) – Delta table properties.

  • commit (dict[str, Any]) – Delta commit metadata.

validate(*, context)[source]

Validate path config and delta-rs option dictionaries.

Parameters:

context (str)

Return type:

None

to_location()[source]

Convert to TableLocation.

Return type:

TableLocation

class loom.etl.storage.TableRoute(name, ref='', catalog='', path=None)[source]

Bases: Struct

Route one logical table name to catalog ref or physical path.

Parameters:
  • name (str) – Logical table name used by the pipeline (e.g. "sys.customers").

  • ref (str) – Catalog reference. Supported forms: * "catalog.schema.table" * "schema.table" (uses catalog or default)

  • catalog (str) – Catalog connection key used with 2-part refs.

  • path (TablePathConfig | None) – Physical Delta path route.

validate(*, catalogs, context)[source]

Validate route syntax and exclusivity constraints.

Parameters:
Return type:

None

class loom.etl.storage.FilePathConfig(uri='', storage_options=<factory>)[source]

Bases: Struct

Physical path settings for FILE sources/targets.

Parameters:
  • uri (str) – Output/input file URI template.

  • storage_options (dict[str, str]) – Object-store credentials/options.

validate(*, context)[source]

Validate file path config.

Parameters:

context (str)

Return type:

None

class loom.etl.storage.FileRoute(name, path)[source]

Bases: Struct

Route one logical file name to a physical file URI.

Parameters:
  • name (str) – Logical file name used by the pipeline.

  • path (FilePathConfig) – Physical path configuration.

validate(*, context)[source]

Validate file route fields.

Parameters:

context (str)

Return type:

None

class loom.etl.storage.MissingTablePolicy(value)[source]

Bases: StrEnum

Policy used when a TABLE target does not exist at write time.

class loom.etl.storage.TableLocation(uri, storage_options=<factory>, writer=<factory>, delta_config=<factory>, commit=<factory>)[source]

Bases: object

Physical storage address and write-time configuration for one Delta table.

All dict fields are passed verbatim to the corresponding delta-rs parameter — Loom does not validate or restrict their contents.

Parameters:
class loom.etl.storage.TableLocator(*args, **kwargs)[source]

Bases: Protocol

Protocol for resolving a logical TableRef to a physical TableLocation.

Implement this to support custom storage topologies: per-env routing, secret-manager-backed credentials, or Unity Catalog external tables.

Example:

class MyLocator:
    def locate(self, ref: TableRef) -> TableLocation:
        return TableLocation(uri=f"s3://my-bucket/{ref.ref.replace('.', '/')}/")
locate(ref)[source]

Resolve ref to its physical storage location.

Parameters:

ref (TableRef) – Logical table reference (e.g. TableRef("raw.orders")).

Returns:

TableLocation with full URI and write-time configuration.

Return type:

TableLocation

class loom.etl.storage.PrefixLocator(root, storage_options=None, writer=None, delta_config=None, commit=None)[source]

Bases: object

Resolve all table refs under one root URI.

Dots in the ref are converted to /, so "raw.orders" under "s3://my-lake/" resolves to "s3://my-lake/raw/orders".

Works equally for flat refs ("orders""s3://my-lake/orders") and layered refs ("raw.orders""s3://my-lake/raw/orders").

All keyword arguments are forwarded verbatim to every TableLocation produced by this locator.

Parameters:

Example:

# Minimal — credentials from environment variables
locator = PrefixLocator(root="s3://my-lake/")

# From a pathlib.Path (converted to str internally)
locator = PrefixLocator(root=Path("data/delta"))

# With explicit credentials and compression
locator = PrefixLocator(
    root="s3://my-lake/",
    storage_options={"AWS_REGION": "eu-west-1"},
    writer={"compression": "SNAPPY"},
)
locate(ref)[source]

Resolve ref by appending its slash-separated path to the root.

Parameters:

ref (TableRef) – Logical table reference.

Returns:

TableLocation with the full URI and shared configuration.

Return type:

TableLocation

class loom.etl.storage.MappingLocator(mapping, default=None)[source]

Bases: object

Resolve table refs via an explicit mapping, with an optional fallback.

Useful when tables span multiple cloud accounts, regions, or providers. Refs absent from the mapping fall back to default (if provided), with the ref path appended via the same dot-to-slash conversion as PrefixLocator.

Parameters:
Raises:

KeyError – On locate() when the ref is not in mapping and no default is set.

Example:

locator = MappingLocator(
    mapping={
        "raw.orders": TableLocation(
            uri="s3://raw-account/orders/",
            storage_options={"AWS_ACCESS_KEY_ID": os.environ["RAW_KEY"]},
            writer={"compression": "SNAPPY"},
        ),
        "curated.payments": TableLocation(
            uri="gs://curated-project/payments/",
            storage_options={"GOOGLE_SERVICE_ACCOUNT_KEY": os.environ["GCP_SA"]},
        ),
    },
    default=TableLocation(uri="s3://default-lake/"),
)
locate(ref)[source]

Resolve ref from the mapping or fall back to the default.

Parameters:

ref (TableRef) – Logical table reference.

Returns:

Matching TableLocation.

Raises:

KeyError – When ref is absent from the mapping and no default is set.

Return type:

TableLocation

class loom.etl.storage.CatalogTarget(logical_ref, catalog_ref)[source]

Bases: object

Resolved catalog destination for one logical table.

Parameters:
class loom.etl.storage.PathTarget(logical_ref, location)[source]

Bases: object

Resolved physical destination for one logical table.

Parameters:
class loom.etl.storage.TableRouteResolver(*args, **kwargs)[source]

Bases: Protocol

Resolve one logical table reference to a runtime target.

resolve(logical_ref)[source]

Resolve logical_ref to catalog or path destination.

Parameters:

logical_ref (TableRef)

Return type:

CatalogTarget | PathTarget

class loom.etl.storage.CatalogRouteResolver(default_catalog='')[source]

Bases: object

Resolve all tables as catalog references.

Parameters:

default_catalog (str)

class loom.etl.storage.PathRouteResolver(locator)[source]

Bases: object

Resolve all tables through one table locator.

Parameters:

locator (TableLocator)

class loom.etl.storage.FixedCatalogRouteResolver(catalog_ref)[source]

Bases: object

Resolve one logical table to one explicit catalog table.

Parameters:

catalog_ref (TableRef)

class loom.etl.storage.FixedPathRouteResolver(location)[source]

Bases: object

Resolve one logical table to one explicit physical location.

Parameters:

location (TableLocation)

class loom.etl.storage.CompositeRouteResolver(*, default, overrides=None)[source]

Bases: object

Resolve using table-specific overrides over one default resolver.

Parameters:
class loom.etl.storage.RoutedCatalog(resolver, *, catalog, path=None)[source]

Bases: object

Dispatch TableDiscovery calls by table route (catalog vs path).

Parameters:
loom.etl.storage.build_table_resolver(config)[source]

Build route resolver from StorageConfig defaults + table overrides.

Parameters:

config (StorageConfig)

Return type:

TableRouteResolver