loom.streaming.compiler¶
Streaming compiler: validates StreamFlow and produces CompiledPlan.
- loom.streaming.compiler.compile_flow(flow, *, config)[source]¶
Compile a flow into an immutable plan.
- Parameters:
flow (StreamFlow[Any, Any]) – User-declared streaming flow.
config (ConfigContext | Mapping[str, Any]) – Canonical runtime config context used for binding resolution and Kafka settings extraction.
- Returns:
Immutable
CompiledPlanready for adapter wiring.- Raises:
CompilationError – If binding resolution or any validation phase fails.
- Return type:
- exception loom.streaming.compiler.CompilationError(errors)[source]¶
Bases:
ExceptionRaised when a StreamFlow fails validation or cannot be compiled.
- class loom.streaming.compiler.CompiledMultiSource(settings, topics, dispatch, shape, decode_strategy)[source]¶
Bases:
objectResolved heterogeneous Kafka source with a pre-built dispatch table.
- Parameters:
settings (ConsumerSettings) – Resolved Kafka consumer settings.
topics (tuple[str, ...]) – Kafka topic names to subscribe to.
dispatch (DispatchTable) – Pre-built table mapping
message_typeand errorpayload_typestrings to their concrete Python types, plus explicit wire-error payload types.shape (StreamShape) – Declared source shape.
decode_strategy (Literal['record', 'batch']) – Whether to decode records individually or in batches.
- class loom.streaming.compiler.CompiledNode(node, input_shape, output_shape, path=())[source]¶
Bases:
objectDSL node annotated with input/output shapes.
- Parameters:
node (object)
input_shape (StreamShape)
output_shape (StreamShape)
- class loom.streaming.compiler.CompiledPlan(name, source, nodes, output, error_routes, needs_async_bridge, terminal_sinks=<factory>, terminal_storage_sinks=<factory>)[source]¶
Bases:
objectImmutable result of compiling a StreamFlow.
- Parameters:
name (str)
source (CompiledSingleSource | CompiledMultiSource | CompiledMongoCDCSource)
nodes (tuple[CompiledNode, ...])
output (CompiledSink | None)
error_routes (dict[ErrorKind, CompiledSink])
needs_async_bridge (bool)
terminal_sinks (dict[tuple[int, ...], CompiledSink])
terminal_storage_sinks (dict[tuple[int, ...], CompiledStorageSink])
- class loom.streaming.compiler.CompiledSingleSource(settings, topics, payload_type, shape, decode_strategy)[source]¶
Bases:
objectResolved Kafka source with a single payload type and decode strategy.
- Parameters:
settings (ConsumerSettings)
payload_type (type[LoomStruct | LoomFrozenStruct])
shape (StreamShape)
decode_strategy (Literal['record', 'batch'])
- class loom.streaming.compiler.CompiledSink(settings, topic, partition_policy, dlq_topic=None)[source]¶
Bases:
objectResolved Kafka sink with partitioning.
- Parameters:
settings (ProducerSettings)
topic (str)
partition_policy (PartitionPolicy[LoomStruct | LoomFrozenStruct] | None)
dlq_topic (str | None)
- class loom.streaming.compiler.CompiledStorageSink(node, config, database_config=None)[source]¶
Bases:
objectResolved storage sink with its DSL node and pre-fetched config section.
The adapter calls
node.build_partition(config, worker_index, worker_count, bridge)at startup once per Bytewax worker to obtain theSinkPartitionthat handles epoch writes and shutdown. Storage backends that require async execution receive the adapter-managedAsyncBridge.- Parameters:
node (Any) – The
IntoSinknode as declared in the DSL.config (SqlAlchemySinkConfig | DeltaSinkConfig | ClickHouseSinkConfig) – Resolved storage sink config for the selected backend.
database_config (SqlAlchemyDatabaseConfig | None) – Resolved shared SQLAlchemy database config, if needed.