loom.streaming.bytewax.runner

StreamingRunner — production Bytewax runtime entry point for Loom flows.

Functions

_create_bridge(plan, runtime)

Create an AsyncBridge configured from runtime settings, or None if not needed.

_prepare_run(plan, *[, ...])

_recovery_config(recovery)

_runtime_kwargs(runtime)

_to_timedelta(value_ms)

Classes

BytewaxRecoverySettings(db_dir[, ...])

Recovery settings for the Bytewax runtime.

BytewaxRuntimeConfig([workers_per_process, ...])

Runtime settings for executing a Bytewax dataflow.

ErrorSink(*args, **kwargs)

Bytewax sink protocol for ErrorEnvelope items.

StreamingRunner(*[, _registry])

Wire a StreamFlow declaration into the real Bytewax runtime.

_PreparedStreamingRun(dataflow, shutdown)

Prepared Bytewax execution bundle.

class loom.streaming.bytewax.runner.BytewaxRecoverySettings(db_dir, backup_interval_ms=None)[source]

Bases: LoomFrozenStruct

Recovery settings for the Bytewax runtime.

Parameters:
  • db_dir (str) – Local recovery directory shared by this execution.

  • backup_interval_ms (int | None) – Optional snapshot backup interval in milliseconds.

class loom.streaming.bytewax.runner.BytewaxRuntimeConfig(workers_per_process=1, process_id=None, addresses=None, epoch_interval_ms=None, recovery=None, async_backend='asyncio', use_uvloop=False, force_shutdown_timeout_ms=None)[source]

Bases: LoomFrozenStruct

Runtime settings for executing a Bytewax dataflow.

Delivery guarantee: By default Kafka offsets are committed by the consumer’s auto-commit policy and are not tied to downstream success. When enable.auto.commit is disabled in the consumer settings, Loom tracks item completion through the adapter and commits offsets only after the downstream branch finishes successfully.

Parameters:
  • workers_per_process (int) – Number of worker threads in this process.

  • process_id (int | None) – Optional cluster process id.

  • addresses (tuple[str, ...] | None) – Optional cluster address list, including this process.

  • epoch_interval_ms (int | None) – Optional epoch duration in milliseconds.

  • recovery (BytewaxRecoverySettings | None) – Optional recovery settings.

  • async_backend (Literal['asyncio', 'trio']) – Anyio backend used for AsyncBridge when the flow contains WithAsync nodes. Accepted values: "asyncio" (default) or "trio".

  • use_uvloop (bool) – Replace the asyncio event loop with uvloop for lower latency async I/O. Only effective when async_backend is "asyncio" and the platform is not Windows.

  • force_shutdown_timeout_ms (int | None) – Maximum milliseconds to wait for in-flight async tasks to finish during shutdown. When exceeded, the portal thread is abandoned and a warning is logged. None waits indefinitely (default).

class loom.streaming.bytewax.runner.ErrorSink(*args, **kwargs)[source]

Bases: Protocol

Bytewax sink protocol for ErrorEnvelope items.

class loom.streaming.bytewax.runner.StreamingRunner(*, _registry=None)[source]

Bases: object

Wire a StreamFlow declaration into the real Bytewax runtime.

Parameters:

_registry (SinkRegistry | None) – Optional pre-built SinkRegistry for testing purposes.

classmethod from_context(flow, *, config, observability_runtime=None)[source]

Build a runner from a resolved config context.

Parameters:
Return type:

StreamingRunner

classmethod from_yaml(flow, path, *, observability_runtime=None)[source]

Load config from YAML and build a runner.

Parameters:
Return type:

StreamingRunner

classmethod from_dict(flow, config, *, observability_runtime=None)[source]

Build a runner from a plain Python config mapping.

Parameters:
Return type:

StreamingRunner

register_sink(sink_class)[source]

Register a sink class to be resolved from YAML config at run time.

Parameters:

sink_class (type) – Class that satisfies the RegisteredSink protocol.

Raises:

TypeError – If sink_class does not implement RegisteredSink.

Return type:

None

build_dataflow()[source]

Assemble the Bytewax Dataflow and keep shutdown state.

Return type:

Dataflow

prepare_run(*, error_sinks=None)[source]

Prepare one executable dataflow and its shutdown callback.

Releases any resources from a previous prepare_run() call before building the new dataflow, so calling this method twice is safe.

Parameters:

error_sinks (Mapping[ErrorKind, ErrorSink] | None) – Optional mapping of ErrorKind to ErrorSink instances. When provided, these sinks override the default error routing built from the compiled plan.

Returns:

A bundle containing the assembled Dataflow and a shutdown callable that releases adapter-owned resources.

Return type:

_PreparedStreamingRun

run(*, flow=None, config=None, config_path=None, error_sinks=None, runtime=None)[source]

Execute the compiled dataflow with the real Bytewax runtime.

Emits Scope.POLL_CYCLE around the full invocation (including preparation) and Scope.FLOW around the blocking cli_main call. Starts the Prometheus scrape server before execution when configured.

Parameters:
  • flow (StreamFlow[Any, Any] | None) – Optional flow override. When provided together with config or config_path, the runner is reconfigured before execution.

  • config (ConfigContext | None) – Optional config context. Used together with flow to reconfigure the runner.

  • config_path (str | Path | None) – Optional path to a YAML config file. Loaded and used like config when provided.

  • error_sinks (Mapping[ErrorKind, Any] | None) – Optional explicit error sinks. When provided, these take precedence over registry-resolved sinks.

  • runtime (BytewaxRuntimeConfig | None) – Optional runtime override. When omitted, uses the runner’s config-loaded runtime settings.

Return type:

None

shutdown()[source]

Release adapter resources after a run or failed build.

Return type:

None