loom.streaming.bytewax¶
Bytewax runtime adapter for Loom streaming flows.
This package requires bytewax to be installed.
Usage:
from loom.streaming.bytewax import build_dataflow
flow = build_dataflow(compiled_plan)
- loom.streaming.bytewax.build_dataflow(plan, *, observability_runtime=None, source=None, sink=None, terminal_sinks=None, error_sinks=None)[source]¶
Build a Bytewax Dataflow from a compiled plan.
- loom.streaming.bytewax.build_dataflow_with_shutdown(plan, *, observability_runtime=None, source=None, sink=None, terminal_sinks=None, error_sinks=None, bridge=None, commit_tracker=None)[source]¶
Build a Bytewax Dataflow and expose its shutdown callback.
- Parameters:
plan (CompiledPlan) – Compiled flow plan.
observability_runtime (ObservabilityRuntime | None) – Optional observability runtime for lifecycle events.
source (Any | None) – Optional Bytewax source override (used in tests).
sink (Any | None) – Optional Bytewax sink override (used in tests).
terminal_sinks (Mapping[tuple[int, ...], Any] | None) – Optional per-branch sink overrides.
error_sinks (Mapping[ErrorKind, Any] | None) – Optional per-kind error sink overrides.
bridge (AsyncBridge | None) – Pre-configured
AsyncBridge. WhenNone, a default asyncio bridge is created if the plan requires async execution. Pass an explicit bridge to control backend and uvloop settings.commit_tracker (Any | None)
- Return type:
_BuiltDataflow
- class loom.streaming.bytewax.BytewaxRecoverySettings(db_dir, backup_interval_ms=None)[source]¶
Bases:
LoomFrozenStructRecovery settings for the Bytewax runtime.
- class loom.streaming.bytewax.BytewaxRuntimeConfig(workers_per_process=1, process_id=None, addresses=None, epoch_interval_ms=None, recovery=None, async_backend='asyncio', use_uvloop=False, force_shutdown_timeout_ms=None)[source]¶
Bases:
LoomFrozenStructRuntime settings for executing a Bytewax dataflow.
Delivery guarantee: By default Kafka offsets are committed by the consumer’s auto-commit policy and are not tied to downstream success. When
enable.auto.commitis disabled in the consumer settings, Loom tracks item completion through the adapter and commits offsets only after the downstream branch finishes successfully.- Parameters:
workers_per_process (int) – Number of worker threads in this process.
process_id (int | None) – Optional cluster process id.
addresses (tuple[str, ...] | None) – Optional cluster address list, including this process.
epoch_interval_ms (int | None) – Optional epoch duration in milliseconds.
recovery (BytewaxRecoverySettings | None) – Optional recovery settings.
async_backend (Literal['asyncio', 'trio']) – Anyio backend used for
AsyncBridgewhen the flow containsWithAsyncnodes. Accepted values:"asyncio"(default) or"trio".use_uvloop (bool) – Replace the asyncio event loop with uvloop for lower latency async I/O. Only effective when async_backend is
"asyncio"and the platform is not Windows.force_shutdown_timeout_ms (int | None) – Maximum milliseconds to wait for in-flight async tasks to finish during shutdown. When exceeded, the portal thread is abandoned and a warning is logged.
Nonewaits indefinitely (default).
- exception loom.streaming.bytewax.DuplicateErrorSinkError[source]¶
Bases:
LoomStreamingErrorRaised when two registered sinks cover the same ErrorKind.
- class loom.streaming.bytewax.RegisteredSink(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol a registrable sink class must satisfy.
- sink_type¶
Unique type identifier. Must match the
typefield in the YAML config.- Type:
ClassVar[str]
Example:
class ClickHouseErrorTableSink: sink_type = "clickhouse_error_table" config_type = ClickHouseErrorTableSinkConfig @classmethod def build_binding(cls, cfg, ctx): ... return RuntimeSinkBinding(purpose="errors", sink=..., kinds=(ErrorKind.TASK,))
- class loom.streaming.bytewax.RuntimeSinkBinding(purpose, sink, kinds=())[source]¶
Bases:
LoomFrozenStructResolved sink binding returned by a RegisteredSink.
- class loom.streaming.bytewax.StreamingRunner(*, _registry=None)[source]¶
Bases:
objectWire a
StreamFlowdeclaration into the real Bytewax runtime.- Parameters:
_registry (SinkRegistry | None) – Optional pre-built SinkRegistry for testing purposes.
- classmethod from_context(flow, *, config, observability_runtime=None)[source]¶
Build a runner from a resolved config context.
- Parameters:
flow (StreamFlow[Any, Any])
config (ConfigContext)
observability_runtime (ObservabilityRuntime | None)
- Return type:
- classmethod from_yaml(flow, path, *, observability_runtime=None)[source]¶
Load config from YAML and build a runner.
- Parameters:
flow (StreamFlow[Any, Any])
path (str)
observability_runtime (ObservabilityRuntime | None)
- Return type:
- classmethod from_dict(flow, config, *, observability_runtime=None)[source]¶
Build a runner from a plain Python config mapping.
- Parameters:
- Return type:
- register_sink(sink_class)[source]¶
Register a sink class to be resolved from YAML config at run time.
- build_dataflow()[source]¶
Assemble the Bytewax Dataflow and keep shutdown state.
- Return type:
Dataflow
- prepare_run(*, error_sinks=None)[source]¶
Prepare one executable dataflow and its shutdown callback.
Releases any resources from a previous
prepare_run()call before building the new dataflow, so calling this method twice is safe.- Parameters:
error_sinks (Mapping[ErrorKind, ErrorSink] | None) – Optional mapping of
ErrorKindtoErrorSinkinstances. When provided, these sinks override the default error routing built from the compiled plan.- Returns:
A bundle containing the assembled
Dataflowand a shutdown callable that releases adapter-owned resources.- Return type:
_PreparedStreamingRun
- run(*, flow=None, config=None, config_path=None, error_sinks=None, runtime=None)[source]¶
Execute the compiled dataflow with the real Bytewax runtime.
Emits
Scope.POLL_CYCLEaround the full invocation (including preparation) andScope.FLOWaround the blockingcli_maincall. Starts the Prometheus scrape server before execution when configured.- Parameters:
flow (StreamFlow[Any, Any] | None) – Optional flow override. When provided together with
configorconfig_path, the runner is reconfigured before execution.config (ConfigContext | None) – Optional config context. Used together with
flowto reconfigure the runner.config_path (str | Path | None) – Optional path to a YAML config file. Loaded and used like
configwhen provided.error_sinks (Mapping[ErrorKind, Any] | None) – Optional explicit error sinks. When provided, these take precedence over registry-resolved sinks.
runtime (BytewaxRuntimeConfig | None) – Optional runtime override. When omitted, uses the runner’s config-loaded runtime settings.
- Return type:
None