Source code for loom.etl.backends.polars._file_writer

"""Polars file target writer (CSV / JSON / PARQUET)."""

from __future__ import annotations

import logging
from collections.abc import Callable

import polars as pl

from loom.etl.declarative._format import Format
from loom.etl.declarative._write_options import (
    CsvWriteOptions,
    JsonWriteOptions,
    ParquetWriteOptions,
    WriteOptions,
)
from loom.etl.declarative.target._file import FileSpec

_log = logging.getLogger(__name__)


[docs] class PolarsFileWriter: """Write FILE targets with Polars DataFrame writers."""
[docs] def write(self, frame: pl.LazyFrame, spec: FileSpec, *, streaming: bool = False) -> None: """Write *frame* to the path declared in *spec*.""" if streaming: _log.debug("streaming write: sink_%s path=%s", spec.format, spec.path) _STREAMING_WRITERS[spec.format](frame, spec.path, spec.write_options) else: _FILE_WRITERS[spec.format](frame.collect(), spec.path, spec.write_options)
_Options = WriteOptions | None def _write_csv_file(df: pl.DataFrame, path: str, options: _Options) -> None: opts = options if isinstance(options, CsvWriteOptions) else CsvWriteOptions() df.write_csv( path, separator=opts.separator, include_header=opts.has_header, **dict(opts.kwargs), ) def _write_parquet_file(df: pl.DataFrame, path: str, options: _Options) -> None: opts = options if isinstance(options, ParquetWriteOptions) else ParquetWriteOptions() df.write_parquet(path, compression=opts.compression, **dict(opts.kwargs)) def _write_json_file(df: pl.DataFrame, path: str, options: _Options) -> None: opts = options if isinstance(options, JsonWriteOptions) else JsonWriteOptions() df.write_ndjson(path, **dict(opts.kwargs)) _DataFrameWriter = Callable[[pl.DataFrame, str, _Options], None] _FILE_WRITERS: dict[Format, _DataFrameWriter] = { Format.CSV: _write_csv_file, Format.PARQUET: _write_parquet_file, Format.JSON: _write_json_file, } def _sink_csv_file(frame: pl.LazyFrame, path: str, options: _Options) -> None: opts = options if isinstance(options, CsvWriteOptions) else CsvWriteOptions() frame.sink_csv( path, separator=opts.separator, include_header=opts.has_header, **dict(opts.kwargs), ) def _sink_parquet_file(frame: pl.LazyFrame, path: str, options: _Options) -> None: opts = options if isinstance(options, ParquetWriteOptions) else ParquetWriteOptions() frame.sink_parquet(path, compression=opts.compression, **dict(opts.kwargs)) def _sink_ndjson_file(frame: pl.LazyFrame, path: str, options: _Options) -> None: opts = options if isinstance(options, JsonWriteOptions) else JsonWriteOptions() frame.sink_ndjson(path, **dict(opts.kwargs)) _LazyFrameWriter = Callable[[pl.LazyFrame, str, _Options], None] _STREAMING_WRITERS: dict[Format, _LazyFrameWriter] = { Format.CSV: _sink_csv_file, Format.PARQUET: _sink_parquet_file, Format.JSON: _sink_ndjson_file, }