loom.etl.storage¶
Storage config, location mapping, and routing.
- class loom.etl.storage.FileLocation(uri_template, storage_options=<factory>)[source]¶
Bases:
objectPhysical storage address for one file route.
- class loom.etl.storage.FileLocator(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for resolving a logical file alias to a physical
FileLocation.Implement this to support custom file routing strategies.
Example:
class MyFileLocator: def locate(self, name: str) -> FileLocation: return FileLocation(uri_template=f"s3://my-bucket/{name}/")
- locate(name)[source]¶
Resolve name to its physical storage location.
- Parameters:
name (str) – Logical file alias declared via
alias()oralias().- Returns:
FileLocationwith full URI template and credentials.- Raises:
KeyError – When name is not registered.
- Return type:
- class loom.etl.storage.MappingFileLocator(mapping)[source]¶
Bases:
objectResolve file aliases via an explicit
alias → FileLocationmapping.Built automatically by
to_file_locator()from thestorage.filesconfiguration block.- Parameters:
mapping (dict[str, FileLocation]) –
alias → FileLocationdict.- Raises:
Example:
locator = MappingFileLocator( mapping={ "events_raw": FileLocation( uri_template="s3://raw/events/", storage_options={"AWS_REGION": "eu-west-1"}, ), "exports_daily": FileLocation( uri_template="s3://exports/daily/", ), } )
- locate(name)[source]¶
Resolve name from the mapping.
- Parameters:
name (str) – Logical file alias.
- Returns:
FileLocationfor the alias.- Raises:
KeyError – When name is not registered. The error message lists available aliases to aid debugging.
- Return type:
- class loom.etl.storage.StorageEngine(value)[source]¶
Bases:
StrEnumExecution engine guardrail declared in the storage config.
- class loom.etl.storage.StorageConfig(engine='polars', missing_table_policy=MissingTablePolicy.SCHEMA_MODE, catalogs=<factory>, defaults=StorageDefaults(table_path=None), tables=(), files=(), tmp_root='', tmp_storage_options=<factory>)[source]¶
Bases:
StructCanonical storage configuration used by ETL runner/factory.
- Parameters:
engine (Literal['polars', 'spark']) – Engine guardrail.
missing_table_policy (MissingTablePolicy) – First-run policy for missing destination tables.
catalogs (dict[str, CatalogConnection]) – Catalog connection map.
defaults (StorageDefaults) – Default path settings.
tables (tuple[TableRoute, ...]) – Per-logical-table routes.
tmp_root (str) – Root URI for intermediate storage.
tmp_storage_options (dict[str, str]) – Credentials/options for intermediate storage.
- property checkpoint_storage_options: dict[str, str]¶
Checkpoint storage options (canonical alias of
tmp_storage_options).
- to_file_locator()[source]¶
Build a
MappingFileLocatorfrom the declaredstorage.filesroutes, orNonewhen no file routes are configured.- Returns:
Locator mapping each
files[].nameto its physicalFileLocation, orNonewhenstorage.filesis empty.- Return type:
MappingFileLocator | None
- has_path_routes()[source]¶
Return
Truewhen defaults or per-table routes use path mode.- Return type:
- to_path_locator()[source]¶
Build a
TableLocatorfrom path defaults and per-table path routes.- Raises:
ValueError – If no path defaults/routes are configured.
- Return type:
- class loom.etl.storage.StorageDefaults(table_path=None)[source]¶
Bases:
StructDefault resolution settings used when no per-name override is declared.
- Parameters:
table_path (TablePathConfig | None)
- class loom.etl.storage.CatalogConnection(provider='unity', workspace='', token='')[source]¶
Bases:
StructCatalog connection settings.
- class loom.etl.storage.TablePathConfig(uri='', storage_options=<factory>, writer=<factory>, delta_config=<factory>, commit=<factory>)[source]¶
Bases:
StructPhysical Delta path settings for table storage.
- Parameters:
- validate(*, context)[source]¶
Validate path config and delta-rs option dictionaries.
- Parameters:
context (str)
- Return type:
None
- to_location()[source]¶
Convert to
TableLocation.- Return type:
- class loom.etl.storage.TableRoute(name, ref='', catalog='', path=None)[source]¶
Bases:
StructRoute one logical table name to catalog ref or physical path.
- Parameters:
name (str) – Logical table name used by the pipeline (e.g.
"sys.customers").ref (str) – Catalog reference. Supported forms: *
"catalog.schema.table"*"schema.table"(usescatalogordefault)catalog (str) – Catalog connection key used with 2-part refs.
path (TablePathConfig | None) – Physical Delta path route.
- class loom.etl.storage.FilePathConfig(uri='', storage_options=<factory>)[source]¶
Bases:
StructPhysical path settings for FILE sources/targets.
- Parameters:
- class loom.etl.storage.FileRoute(name, path)[source]¶
Bases:
StructRoute one logical file name to a physical file URI.
- Parameters:
name (str) – Logical file name used by the pipeline.
path (FilePathConfig) – Physical path configuration.
- class loom.etl.storage.MissingTablePolicy(value)[source]¶
Bases:
StrEnumPolicy used when a TABLE target does not exist at write time.
- class loom.etl.storage.TableLocation(uri, storage_options=<factory>, writer=<factory>, delta_config=<factory>, commit=<factory>)[source]¶
Bases:
objectPhysical storage address and write-time configuration for one Delta table.
All dict fields are passed verbatim to the corresponding delta-rs parameter — Loom does not validate or restrict their contents.
- Parameters:
uri (str) – Full URI accepted by delta-rs.
storage_options (dict[str, str]) – Cloud credentials / connection settings. See https://delta-io.github.io/delta-rs/api/delta_writer/
writer (dict[str, Any]) – Parquet writer settings →
WriterProperties(**writer). See https://delta-io.github.io/delta-rs/api/delta_writer/#deltalake.WriterPropertiesdelta_config (dict[str, str | None]) – Delta table properties written to the transaction log. See https://docs.delta.io/latest/table-properties.html
commit (dict[str, Any]) – Commit metadata →
CommitProperties(**commit). See https://delta-io.github.io/delta-rs/api/delta_writer/#deltalake.CommitProperties
- class loom.etl.storage.TableLocator(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for resolving a logical
TableRefto a physicalTableLocation.Implement this to support custom storage topologies: per-env routing, secret-manager-backed credentials, or Unity Catalog external tables.
Example:
class MyLocator: def locate(self, ref: TableRef) -> TableLocation: return TableLocation(uri=f"s3://my-bucket/{ref.ref.replace('.', '/')}/")
- locate(ref)[source]¶
Resolve ref to its physical storage location.
- Parameters:
ref (TableRef) – Logical table reference (e.g.
TableRef("raw.orders")).- Returns:
TableLocationwith full URI and write-time configuration.- Return type:
- class loom.etl.storage.PrefixLocator(root, storage_options=None, writer=None, delta_config=None, commit=None)[source]¶
Bases:
objectResolve all table refs under one root URI.
Dots in the ref are converted to
/, so"raw.orders"under"s3://my-lake/"resolves to"s3://my-lake/raw/orders".Works equally for flat refs (
"orders"→"s3://my-lake/orders") and layered refs ("raw.orders"→"s3://my-lake/raw/orders").All keyword arguments are forwarded verbatim to every
TableLocationproduced by this locator.- Parameters:
root (str) – Root URI — local path or cloud URI. Accepts
str(cloud URI likes3://bucket/pathorpathlib.Path); converted tostrinternally.storage_options (dict[str, str] | None) – Cloud credentials. See https://delta-io.github.io/delta-rs/api/delta_writer/
writer (dict[str, Any] | None) – Parquet writer settings →
WriterProperties(**writer). See https://delta-io.github.io/delta-rs/api/delta_writer/#deltalake.WriterPropertiesdelta_config (dict[str, str | None] | None) – Delta table properties. See https://docs.delta.io/latest/table-properties.html
commit (dict[str, Any] | None) – Commit metadata →
CommitProperties(**commit). See https://delta-io.github.io/delta-rs/api/delta_writer/#deltalake.CommitProperties
Example:
# Minimal — credentials from environment variables locator = PrefixLocator(root="s3://my-lake/") # From a pathlib.Path (converted to str internally) locator = PrefixLocator(root=Path("data/delta")) # With explicit credentials and compression locator = PrefixLocator( root="s3://my-lake/", storage_options={"AWS_REGION": "eu-west-1"}, writer={"compression": "SNAPPY"}, )
- locate(ref)[source]¶
Resolve ref by appending its slash-separated path to the root.
- Parameters:
ref (TableRef) – Logical table reference.
- Returns:
TableLocationwith the full URI and shared configuration.- Return type:
- class loom.etl.storage.MappingLocator(mapping, default=None)[source]¶
Bases:
objectResolve table refs via an explicit mapping, with an optional fallback.
Useful when tables span multiple cloud accounts, regions, or providers. Refs absent from the mapping fall back to default (if provided), with the ref path appended via the same dot-to-slash conversion as
PrefixLocator.- Parameters:
mapping (dict[str, TableLocation]) –
ref_string → TableLocationmap.default (TableLocation | None) – Fallback
TableLocationfor unmapped refs.
- Raises:
KeyError – On
locate()when the ref is not in mapping and no default is set.
Example:
locator = MappingLocator( mapping={ "raw.orders": TableLocation( uri="s3://raw-account/orders/", storage_options={"AWS_ACCESS_KEY_ID": os.environ["RAW_KEY"]}, writer={"compression": "SNAPPY"}, ), "curated.payments": TableLocation( uri="gs://curated-project/payments/", storage_options={"GOOGLE_SERVICE_ACCOUNT_KEY": os.environ["GCP_SA"]}, ), }, default=TableLocation(uri="s3://default-lake/"), )
- locate(ref)[source]¶
Resolve ref from the mapping or fall back to the default.
- Parameters:
ref (TableRef) – Logical table reference.
- Returns:
Matching
TableLocation.- Raises:
KeyError – When ref is absent from the mapping and no default is set.
- Return type:
- class loom.etl.storage.CatalogTarget(logical_ref, catalog_ref)[source]¶
Bases:
objectResolved catalog destination for one logical table.
- class loom.etl.storage.PathTarget(logical_ref, location)[source]¶
Bases:
objectResolved physical destination for one logical table.
- Parameters:
logical_ref (TableRef)
location (TableLocation)
- class loom.etl.storage.TableRouteResolver(*args, **kwargs)[source]¶
Bases:
ProtocolResolve one logical table reference to a runtime target.
- class loom.etl.storage.CatalogRouteResolver(default_catalog='')[source]¶
Bases:
objectResolve all tables as catalog references.
- Parameters:
default_catalog (str)
- class loom.etl.storage.PathRouteResolver(locator)[source]¶
Bases:
objectResolve all tables through one table locator.
- Parameters:
locator (TableLocator)
- class loom.etl.storage.FixedCatalogRouteResolver(catalog_ref)[source]¶
Bases:
objectResolve one logical table to one explicit catalog table.
- Parameters:
catalog_ref (TableRef)
- class loom.etl.storage.FixedPathRouteResolver(location)[source]¶
Bases:
objectResolve one logical table to one explicit physical location.
- Parameters:
location (TableLocation)
- class loom.etl.storage.CompositeRouteResolver(*, default, overrides=None)[source]¶
Bases:
objectResolve using table-specific overrides over one default resolver.
- Parameters:
default (TableRouteResolver)
overrides (dict[str, TableRouteResolver] | None)
- class loom.etl.storage.RoutedCatalog(resolver, *, catalog, path=None)[source]¶
Bases:
objectDispatch
TableDiscoverycalls by table route (catalog vs path).- Parameters:
resolver (TableRouteResolver)
catalog (TableDiscovery)
path (TableDiscovery | None)
- loom.etl.storage.build_table_resolver(config)[source]¶
Build route resolver from
StorageConfigdefaults + table overrides.- Parameters:
config (StorageConfig)
- Return type: