Source code for loom.etl.storage._file_locator

"""FileLocator — protocol and built-in implementation for file URI resolution.

Decouples logical file aliases (e.g. ``"events_raw"``) from physical storage
URIs and cloud credentials.  Backends receive an optional locator at
construction time; when present, ``FromFile.alias()`` / ``IntoFile.alias()``
specs are resolved through it at runtime.

Built-in implementation
-----------------------
* :class:`MappingFileLocator` — explicit ``alias → FileLocation`` mapping
  built from :attr:`~loom.etl.StorageConfig.files`.

Usage
-----
File aliases are declared in the storage config::

    storage:
      files:
        - name: events_raw
          path:
            uri: s3://raw-bucket/events/
            storage_options:
              AWS_REGION: eu-west-1
        - name: exports_daily
          path:
            uri: s3://exports-bucket/daily/

And consumed in pipelines via :meth:`~loom.etl.FromFile.alias` and
:meth:`~loom.etl.IntoFile.alias`::

    events = FromFile.alias("events_raw", format=Format.CSV)
    target = IntoFile.alias("exports_daily", format=Format.PARQUET)

The runner resolves the alias to the physical URI at job startup —
the pipeline never hard-codes storage paths.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Protocol, runtime_checkable


[docs] @dataclass(frozen=True) class FileLocation: """Physical storage address for one file route. Args: uri_template: Full URI or URI template. Supports ``{field_name}`` placeholders consistent with :class:`~loom.etl.FromFile` path templates. storage_options: Cloud credentials / connection settings passed verbatim to the underlying I/O layer. """ uri_template: str storage_options: dict[str, str] = field(default_factory=dict)
[docs] @runtime_checkable class FileLocator(Protocol): """Protocol for resolving a logical file alias to a physical :class:`FileLocation`. Implement this to support custom file routing strategies. Example:: class MyFileLocator: def locate(self, name: str) -> FileLocation: return FileLocation(uri_template=f"s3://my-bucket/{name}/") """
[docs] def locate(self, name: str) -> FileLocation: """Resolve *name* to its physical storage location. Args: name: Logical file alias declared via :meth:`~loom.etl.FromFile.alias` or :meth:`~loom.etl.IntoFile.alias`. Returns: :class:`FileLocation` with full URI template and credentials. Raises: KeyError: When *name* is not registered. """ ...
[docs] class MappingFileLocator: """Resolve file aliases via an explicit ``alias → FileLocation`` mapping. Built automatically by :meth:`~loom.etl.StorageConfig.to_file_locator` from the ``storage.files`` configuration block. Args: mapping: ``alias → FileLocation`` dict. Raises: KeyError: On :meth:`locate` when the alias is not in *mapping*. Example:: locator = MappingFileLocator( mapping={ "events_raw": FileLocation( uri_template="s3://raw/events/", storage_options={"AWS_REGION": "eu-west-1"}, ), "exports_daily": FileLocation( uri_template="s3://exports/daily/", ), } ) """ def __init__(self, mapping: dict[str, FileLocation]) -> None: self._mapping = mapping
[docs] def locate(self, name: str) -> FileLocation: """Resolve *name* from the mapping. Args: name: Logical file alias. Returns: :class:`FileLocation` for the alias. Raises: KeyError: When *name* is not registered. The error message lists available aliases to aid debugging. """ location = self._mapping.get(name) if location is None: available = sorted(self._mapping) raise KeyError( f"No file route configured for alias {name!r}. " f"Available aliases: {available}. " "Define it under storage.files in your config YAML." ) return location
__all__ = ["FileLocation", "FileLocator", "MappingFileLocator"]