loom.streaming.bytewax

Bytewax runtime adapter for Loom streaming flows.

This package requires bytewax to be installed.

Usage:

from loom.streaming.bytewax import build_dataflow

flow = build_dataflow(compiled_plan)
loom.streaming.bytewax.build_dataflow(plan, *, observability_runtime=None, source=None, sink=None, terminal_sinks=None, error_sinks=None)[source]

Build a Bytewax Dataflow from a compiled plan.

Parameters:
Return type:

Any

loom.streaming.bytewax.build_dataflow_with_shutdown(plan, *, observability_runtime=None, source=None, sink=None, terminal_sinks=None, error_sinks=None, bridge=None, commit_tracker=None)[source]

Build a Bytewax Dataflow and expose its shutdown callback.

Parameters:
  • plan (CompiledPlan) – Compiled flow plan.

  • observability_runtime (ObservabilityRuntime | None) – Optional observability runtime for lifecycle events.

  • source (Any | None) – Optional Bytewax source override (used in tests).

  • sink (Any | None) – Optional Bytewax sink override (used in tests).

  • terminal_sinks (Mapping[tuple[int, ...], Any] | None) – Optional per-branch sink overrides.

  • error_sinks (Mapping[ErrorKind, Any] | None) – Optional per-kind error sink overrides.

  • bridge (AsyncBridge | None) – Pre-configured AsyncBridge. When None, a default asyncio bridge is created if the plan requires async execution. Pass an explicit bridge to control backend and uvloop settings.

  • commit_tracker (Any | None)

Return type:

_BuiltDataflow

class loom.streaming.bytewax.BytewaxRecoverySettings(db_dir, backup_interval_ms=None)[source]

Bases: LoomFrozenStruct

Recovery settings for the Bytewax runtime.

Parameters:
  • db_dir (str) – Local recovery directory shared by this execution.

  • backup_interval_ms (int | None) – Optional snapshot backup interval in milliseconds.

class loom.streaming.bytewax.BytewaxRuntimeConfig(workers_per_process=1, process_id=None, addresses=None, epoch_interval_ms=None, recovery=None, async_backend='asyncio', use_uvloop=False, force_shutdown_timeout_ms=None)[source]

Bases: LoomFrozenStruct

Runtime settings for executing a Bytewax dataflow.

Delivery guarantee: By default Kafka offsets are committed by the consumer’s auto-commit policy and are not tied to downstream success. When enable.auto.commit is disabled in the consumer settings, Loom tracks item completion through the adapter and commits offsets only after the downstream branch finishes successfully.

Parameters:
  • workers_per_process (int) – Number of worker threads in this process.

  • process_id (int | None) – Optional cluster process id.

  • addresses (tuple[str, ...] | None) – Optional cluster address list, including this process.

  • epoch_interval_ms (int | None) – Optional epoch duration in milliseconds.

  • recovery (BytewaxRecoverySettings | None) – Optional recovery settings.

  • async_backend (Literal['asyncio', 'trio']) – Anyio backend used for AsyncBridge when the flow contains WithAsync nodes. Accepted values: "asyncio" (default) or "trio".

  • use_uvloop (bool) – Replace the asyncio event loop with uvloop for lower latency async I/O. Only effective when async_backend is "asyncio" and the platform is not Windows.

  • force_shutdown_timeout_ms (int | None) – Maximum milliseconds to wait for in-flight async tasks to finish during shutdown. When exceeded, the portal thread is abandoned and a warning is logged. None waits indefinitely (default).

exception loom.streaming.bytewax.DuplicateErrorSinkError[source]

Bases: LoomStreamingError

Raised when two registered sinks cover the same ErrorKind.

class loom.streaming.bytewax.RegisteredSink(*args, **kwargs)[source]

Bases: Protocol

Protocol a registrable sink class must satisfy.

sink_type

Unique type identifier. Must match the type field in the YAML config.

Type:

ClassVar[str]

config_type

msgspec Struct type used to deserialize the YAML config section.

Type:

ClassVar[type]

Example:

class ClickHouseErrorTableSink:
    sink_type = "clickhouse_error_table"
    config_type = ClickHouseErrorTableSinkConfig

    @classmethod
    def build_binding(cls, cfg, ctx):
        ...
        return RuntimeSinkBinding(purpose="errors", sink=..., kinds=(ErrorKind.TASK,))
class loom.streaming.bytewax.RuntimeSinkBinding(purpose, sink, kinds=())[source]

Bases: LoomFrozenStruct

Resolved sink binding returned by a RegisteredSink.

Parameters:
  • purpose (Literal['errors', 'terminal', 'audit']) – Role of this sink within the runner.

  • sink (object) – Instantiated sink object ready for use.

  • kinds (tuple[ErrorKind, ...]) – For purpose="errors", the ErrorKind values this sink handles.

class loom.streaming.bytewax.StreamingRunner(*, _registry=None)[source]

Bases: object

Wire a StreamFlow declaration into the real Bytewax runtime.

Parameters:

_registry (SinkRegistry | None) – Optional pre-built SinkRegistry for testing purposes.

classmethod from_context(flow, *, config, observability_runtime=None)[source]

Build a runner from a resolved config context.

Parameters:
Return type:

StreamingRunner

classmethod from_yaml(flow, path, *, observability_runtime=None)[source]

Load config from YAML and build a runner.

Parameters:
Return type:

StreamingRunner

classmethod from_dict(flow, config, *, observability_runtime=None)[source]

Build a runner from a plain Python config mapping.

Parameters:
Return type:

StreamingRunner

register_sink(sink_class)[source]

Register a sink class to be resolved from YAML config at run time.

Parameters:

sink_class (type) – Class that satisfies the RegisteredSink protocol.

Raises:

TypeError – If sink_class does not implement RegisteredSink.

Return type:

None

build_dataflow()[source]

Assemble the Bytewax Dataflow and keep shutdown state.

Return type:

Dataflow

prepare_run(*, error_sinks=None)[source]

Prepare one executable dataflow and its shutdown callback.

Releases any resources from a previous prepare_run() call before building the new dataflow, so calling this method twice is safe.

Parameters:

error_sinks (Mapping[ErrorKind, ErrorSink] | None) – Optional mapping of ErrorKind to ErrorSink instances. When provided, these sinks override the default error routing built from the compiled plan.

Returns:

A bundle containing the assembled Dataflow and a shutdown callable that releases adapter-owned resources.

Return type:

_PreparedStreamingRun

run(*, flow=None, config=None, config_path=None, error_sinks=None, runtime=None)[source]

Execute the compiled dataflow with the real Bytewax runtime.

Emits Scope.POLL_CYCLE around the full invocation (including preparation) and Scope.FLOW around the blocking cli_main call. Starts the Prometheus scrape server before execution when configured.

Parameters:
  • flow (StreamFlow[Any, Any] | None) – Optional flow override. When provided together with config or config_path, the runner is reconfigured before execution.

  • config (ConfigContext | None) – Optional config context. Used together with flow to reconfigure the runner.

  • config_path (str | Path | None) – Optional path to a YAML config file. Loaded and used like config when provided.

  • error_sinks (Mapping[ErrorKind, Any] | None) – Optional explicit error sinks. When provided, these take precedence over registry-resolved sinks.

  • runtime (BytewaxRuntimeConfig | None) – Optional runtime override. When omitted, uses the runner’s config-loaded runtime settings.

Return type:

None

shutdown()[source]

Release adapter resources after a run or failed build.

Return type:

None