loom.streaming¶
Loom Streaming — topic-oriented declarations and transport adapters.
- class loom.streaming.Backend(value)[source]¶
Bases:
StrEnumStorage backend used by
IntoTableto write epoch batches.- Parameters:
SQLALCHEMY – Bulk-insert rows via SQLAlchemy Core. Transport-agnostic — works with any SA-supported database.
DELTA – Stage parquet parts and append them to a Delta Lake table. Requires
deltalakeandpolarsto be installed.CLICKHOUSE – Bulk-insert rows via
clickhouse-connect(native HTTP/2 protocol, no SQLAlchemy). Requiresclickhouse-connectto be installed. SupportsArray,Nullableand all ClickHouse-native types without JSON serialisation workarounds.
- class loom.streaming.BatchExpandStep[source]¶
-
Streaming step that expands one batch into many output messages.
Declare the subclass itself in a flow, not an instance. The compiler materializes the class during binding resolution.
- Pattern:
Batch fan-out step.
Subclasses define
execute(self, messages, **kwargs) -> Iterable[OutT]with the explicit batch and dependency signature they need.
- class loom.streaming.BatchStep[source]¶
-
Streaming step that consumes and produces one batch at a time.
Declare the subclass itself in a flow, not an instance. The compiler materializes the class during binding resolution.
- Pattern:
Batch-shaped step.
Subclasses define
execute(self, messages, **kwargs) -> OutTwith the explicit batch and dependency signature they need.
- class loom.streaming.Broadcast(*routes)[source]¶
Bases:
Generic[InT]Terminal fan-out node that delivers every message to all branches simultaneously.
- Pattern:
Inclusive fan-out.
Unlike
Fork, which routes each message to exactly one branch,Broadcastcopies the message to every declared branch. All branches are independent: they may apply different transformations and write to different output topics.Broadcastis terminal — no process nodes may follow it.- Parameters:
*routes (BroadcastRoute[InT, Any]) – One or more
BroadcastRoutedeclarations.- Raises:
ValueError – If no routes are provided.
Example:
Broadcast( BroadcastRoute( process=Process(RecordStep(to_analytics_event)), output=IntoTopic("events.analytics", payload=AnalyticsEvent), ), BroadcastRoute( process=Process(RecordStep(to_fulfillment_order)), output=IntoTopic("orders.fulfillment", payload=FulfillmentOrder), ), )
- property routes: tuple[BroadcastRoute[InT, Any], ...]¶
Ordered fan-out branches.
- class loom.streaming.BroadcastRoute(process, output=None)[source]¶
Bases:
LoomFrozenStruct,Generic[InT,OutT]One branch of a
Broadcastnode.- Pattern:
Fan-out branch.
- Parameters:
process (Process) – Transformation nodes applied to every incoming message on this branch.
output (IntoTopic[OutT] | None) – Optional terminal Kafka topic that receives the transformed messages. When omitted, the branch acts as a discard branch after its inner process completes.
Example:
BroadcastRoute( process=Process(RecordStep(to_analytics_event)), output=IntoTopic("events.analytics", payload=AnalyticsEvent), )
- class loom.streaming.CollectBatch(max_records, timeout_ms, window=WindowStrategy.COLLECT, event_time=False)[source]¶
Bases:
LoomFrozenStructExplicit shape adapter from
recordtobatch.- Pattern:
Shape adapter.
Groups individual records into batches before handing them to downstream batch-aware nodes. Batch is an optimization grouping, not a first-class semantic unit — downstream nodes receive and return individual records after the batch step executes.
- Parameters:
max_records (int) – Maximum records collected into one batch.
timeout_ms (int) – Maximum wait time before materializing a partial batch.
window (WindowStrategy) – Windowing strategy to use for grouping. Defaults to
WindowStrategy.COLLECT(processing-time count-and-timeout).event_time (bool) – When
True, useMessageMeta.produced_at_msas the event clock instead of wall time. Requires a windowing strategy other thanCOLLECT.
- Raises:
ValueError – If
max_recordsortimeout_msare below one, or ifevent_time=Trueis combined withWindowStrategy.COLLECT.
Example
Collect up to 100 records or wait 500 ms:
CollectBatch(max_records=100, timeout_ms=500)
- class loom.streaming.ContextFactory(factory)[source]¶
Bases:
ConfigurableFactory that creates a fresh context manager on demand.
- Pattern:
Wrapper dependency.
Use this when you need a new context-manager instance per batch (
scope=BATCH) or when you want to configure the factory from YAML.- Parameters:
factory (Callable[[], ContextDependency]) – Callable that returns a context-manager instance.
Example:
db = ContextFactory(lambda: SessionLocal()) With(process=Process(ValidateOrder, IntoTopic("validated")), scope=BATCH, db=db)
- create()[source]¶
Return a fresh context-manager instance.
- Return type:
AbstractContextManager[object] | AbstractAsyncContextManager[object]
- property log: LoggerPort¶
Structured logger bound to this factory class and module.
- class loom.streaming.DeltaSinkConfig(uri, table, mode, storage_options, partition_by, target_file_size, staging_compression, spool_max_bytes, part_max_records, writer_properties, buffer_policy, mini_batch_size=500)[source]¶
Bases:
objectResolved
IntoTablesettings for the Delta backend.- Parameters:
uri (str) – Delta table URI.
table (str) – Final target table name.
mode (Literal['error', 'append', 'ignore']) – delta-rs write mode.
storage_options (dict[str, str]) – Object-store credentials/options.
partition_by (tuple[str, ...]) – Column names used to partition the Delta table.
target_file_size (int | None) – Optional delta-rs target file size in bytes.
staging_compression (Literal['uncompressed', 'lz4', 'zstd']) – Compression codec for the IPC staging stream.
spool_max_bytes (int) – Maximum in-memory bytes before spooling the staging file to disk.
part_max_records (int) – Backward-compatible row-group hint for the final Delta write.
writer_properties (Any | None) – Final Delta writer properties passed to delta-rs.
buffer_policy (_TableBufferPolicy) – Common flush policy shared with other backends.
mini_batch_size (int)
- class loom.streaming.ExpandRoutes(*, expander, routes, default=None)[source]¶
Bases:
Generic[InT]Decompose one event into typed sub-collections and route each to its own Process.
Unlike Broadcast (which copies the same message to every route), ExpandRoutes calls expander.expand() exactly once per event and routes each output type’s rows to the matching Process. Zero redundant reprocessing.
ExpandRoutes is terminal — no process nodes may follow it in the parent Process.
- Parameters:
expander (type[PayloadExpander[InT]]) – PayloadExpander subclass that produces typed sub-collections.
routes (dict[type, Process[Any, Any]]) – Mapping from output type to the Process that handles those rows.
default (Process[Any, Any] | None) – Optional fallback Process for types not declared in routes.
- Raises:
ValueError – If routes is empty and default is None.
- property expander: type[PayloadExpander[InT]]¶
PayloadExpander subclass used to decompose each event.
- class loom.streaming.Explode(exploder)[source]¶
Bases:
Generic[InT]Transformation step that fans one typed event into N typed sub-events.
Explode sits upstream of a Router and has no knowledge of downstream sinks. The Router dispatches each emitted type to the matching branch — the same mechanism used for any multi-type stream. This keeps expanders and sinks fully decoupled.
Can appear in Router branches when nested expansion is needed (e.g. OrderEvent → OrderLineEvent → OrderLineRow).
- Parameters:
exploder (type[PayloadExpander[InT]]) – Class implementing PayloadExpander[InT].
Example:
Process( Explode(StoreEventExpander), Router({ StoreRow: Process(IntoTopic("stores.rows", payload=StoreRow)), LanguageRow: Process(IntoTopic("stores.languages", payload=LanguageRow)), }), )
- class loom.streaming.Drain[source]¶
Bases:
LoomFrozenStructExplicit terminal adapter from any shape to
none.- Pattern:
Shape adapter.
- class loom.streaming.PayloadExpander(*args, **kwargs)[source]¶
Bases:
Protocol[PayloadT_contra]Expand one typed stream event into typed sub-event collections.
Each key in the returned dict is the concrete type that the downstream Router uses to dispatch to the correct branch — type-keyed, not string-keyed. The expand() method must be pure: no I/O, no side effects, no knowledge of downstream sinks or storage backends.
Declare
outputsto allow the compiler to validate that every produced type has a matching Router branch.Example:
class StoreEventExpander(PayloadExpander[StoreEvent]): outputs: ClassVar[tuple[type, ...]] = (StoreRow, LanguageRow) @classmethod def expand(cls, event: StoreEvent) -> dict[type, list[Any]]: return { StoreRow: [StoreRow(id=event.store_id, name=event.name)], LanguageRow: [ LanguageRow(code=code) for code in event.language_codes ], }
- Parameters:
PayloadT_contra – Contravariant type of the incoming stream event.
- class loom.streaming.ErrorEnvelope(kind, reason, payload_type=None, original_message=None)[source]¶
Bases:
LoomFrozenStruct,Generic[PayloadT]Structured error payload routed through explicit error branches.
- Parameters:
kind (ErrorKind) – Logical error category.
reason (str) – Human-readable reason.
payload_type (str | None) – The
message_typeof the original payload, mirrored fromoriginal_message.meta.message_typefor efficient top-level dispatch without nested decode.original_message (ErrorMessage[PayloadT] | None) – Wire-safe snapshot of the original message when available.
- class loom.streaming.ErrorKind(value)[source]¶
Bases:
StrEnumLogical categories for streaming flow errors.
- class loom.streaming.ErrorMessage(*, payload, meta)[source]¶
Bases:
LoomFrozenStruct,Generic[PayloadT]Wire-safe snapshot of the original message carried by an error.
- Parameters:
payload (PayloadT) – Original domain payload.
meta (ErrorMessageMeta) – Snapshot of the original message metadata.
- class loom.streaming.ErrorMessageMeta(*, message_id, correlation_id=None, trace_id=None, parent_trace_id=None, causation_id=None, produced_at_ms=None, message_type=None, message_version=None, topic=None, partition=None, offset=None, key=None, headers=<factory>)[source]¶
Bases:
LoomFrozenStructWire-safe snapshot of the original message metadata.
- Parameters:
message_id (str) – Stable event identifier.
correlation_id (str | None) – Optional correlation identifier.
trace_id (str | None) – Optional trace identifier.
parent_trace_id (str | None) – Optional upstream trace identifier.
causation_id (str | None) – Optional upstream event identifier.
produced_at_ms (int | None) – Optional original producer timestamp in epoch milliseconds.
message_type (str | None) – Optional logical message contract name.
message_version (int | None) – Optional logical message contract version.
topic (str | None) – Source topic name when available.
partition (int | None) – Source partition when available.
offset (int | None) – Source offset when available.
key (bytes | None) – Optional transport key when available. Stored as bytes to keep the error envelope msgspec-compatible when nested inside Kafka payloads.
- class loom.streaming.ErrorRoute(kinds, output)[source]¶
Bases:
LoomFrozenStructRoute a group of error kinds to one output topic.
- class loom.streaming.ExpandStep[source]¶
-
Streaming step that expands one record into many output messages.
Declare the subclass itself in a flow, not an instance. The compiler materializes the class during binding resolution.
- Pattern:
Record fan-out step.
Subclasses define
execute(self, message, **kwargs) -> Iterable[OutT]with the explicit payload and dependency signature they need.
- class loom.streaming.ForEach[source]¶
Bases:
LoomFrozenStructExplicit shape adapter from
batchtorecord.- Pattern:
Shape adapter.
Note
This is a shape-level adapter only. It flattens a batch stream back to individual records so downstream
RecordStepnodes can consume them. It carries no business semantics.
- class loom.streaming.Fork(*, kind, selector=None, routes=None, predicate_routes=(), default=None)[source]¶
Bases:
Generic[InT]Terminal graph-level routing node that physically splits the stream.
- Pattern:
Terminal branching.
Unlike
loom.streaming.nodes.Router, branches are full independent sub-graphs. Each branch may contain any process node, includingWithandWithAsync.Forkis terminal in the parent process: no nodes may follow it.- Parameters:
- property kind: ForkKind¶
Routing family for this fork.
- class loom.streaming.ForkRoute(when, process)[source]¶
Bases:
LoomFrozenStruct,Generic[InT]One predicate branch of a
Forknode.- Pattern:
Branch declaration.
- Parameters:
when (EqExpr | NeExpr | GtExpr | GeExpr | LtExpr | LeExpr | InExpr | BetweenExpr | AndExpr | OrExpr | NotExpr | Predicate[InT]) – Predicate expression or custom predicate.
process (Process) – Terminal process executed when the predicate matches.
- class loom.streaming.FromMongoCDC(name, collections=(), watch_options=<factory>, shape=StreamShape.RECORD)[source]¶
Bases:
LoomFrozenStruct,Generic[MongoCDCPayloadT]Declare a MongoDB CDC stream as a streaming input boundary.
The type parameter
MongoCDCPayloadTidentifies the payload type produced by this source. It defaults toMongoCDCEventso callers that do not need to override the payload type can omit it.- Parameters:
name (str) – Logical input reference used to resolve Mongo source config.
collections (tuple[str, ...]) – Collection names to watch. An empty tuple means a database-level watch.
watch_options (Mapping[str, object]) – Driver watch kwargs resolved later by compiler/runtime.
shape (StreamShape) – Declared source shape.
- property logical_ref: LogicalRef¶
Return the logical input reference.
- class loom.streaming.FromMultiTypeTopic(name, payloads, shape=StreamShape.RECORD)[source]¶
Bases:
LoomFrozenStruct,Generic[MultiPayloadT]Declare a heterogeneous topic-based input boundary.
Reads from a single Kafka topic that carries multiple payload types. The runtime dispatches each record to the correct decoder using
MessageEnvelope.meta.descriptor.message_typefor plain payloads,ErrorEnvelope.payload_typefor routed error envelopes, and the dedicated wire error message type forDecodeErrorpayloads.Each type in
payloadsuses__loom_message_type__when present and otherwise falls back to the fully qualified Python namef"{type.__module__}.{type.__qualname__}"so the compiler can build the dispatch table at compile time without requiring extra boilerplate.DecodeErroris a special-case wire payload with its own explicit message type.- Parameters:
name (str) – Logical input reference.
payloads (tuple[type[LoomStruct | LoomFrozenStruct], ...]) – Tuple of two or more expected payload types.
shape (StreamShape) – Declared source shape.
- Raises:
ValueError – When fewer than two payload types are declared.
Example
source = FromMultiTypeTopic( "events.all", payloads=(OrderCreated, OrderCancelled), )
- property logical_ref: LogicalRef¶
Logical input reference.
- class loom.streaming.FromTopic(name, payload, shape=StreamShape.RECORD)[source]¶
Bases:
LoomFrozenStruct,Generic[PayloadT]Declare a topic-based input boundary.
- Pattern:
Boundary node.
- Parameters:
name (str) – Logical input reference. By default this is also used as the Kafka config reference.
payload (type[PayloadT]) – Logical payload type descriptor.
shape (StreamShape) – Declared source shape.
- property logical_ref: LogicalRef¶
Logical input reference.
- class loom.streaming.IntoSink(*args, **kwargs)[source]¶
Bases:
Protocol[EventT]Structural protocol satisfied by all Into* terminal storage sink nodes.
The compiler detects nodes implementing this protocol through a structural isinstance check and resolves their config from streaming.sinks.<name>. The adapter calls build_partition() once per Bytewax worker to obtain the SinkPartition that handles epoch writes and shutdown.
Implementing a custom Into* node requires no framework imports, no registration, and no base class — any frozen dataclass with the four declared members satisfies this protocol and is treated identically to built-in nodes such as IntoTable.
Example:
@dataclass(frozen=True) class IntoMongo(Generic[EventT]): payload: type[EventT] collection: str name: str = "" router_branch_safe: ClassVar[bool] = True def build_partition( self, config, worker_index, worker_count, bridge=None, session_manager=None, ): return MongoPartition(url=config["url"], collection=self.collection)
- Parameters:
EventT_contra – Contravariant event type this sink accepts.
- build_partition(config, worker_index, worker_count, bridge=None, session_manager=None)[source]¶
Build the per-worker partition for this sink.
Called once per Bytewax worker at startup. The returned SinkPartition is owned by that worker for the lifetime of the run.
- Parameters:
config (object) – Resolved streaming.sinks.<name> config section.
worker_index (int) – Zero-based index of the worker calling this method.
worker_count (int) – Total number of workers in this run.
bridge (AsyncBridge | None) – Optional async bridge provided by the Bytewax runtime.
session_manager (object | None) – Optional adapter-owned backend resource for async SQLAlchemy sinks.
- Returns:
A SinkPartition that will handle write_batch() and close() calls.
- Return type:
SinkPartition[EventT]
- class loom.streaming.IntoTable(payload, table='', backend=Backend.SQLALCHEMY, name='')[source]¶
Bases:
Generic[EventT]Streaming terminal node that writes typed events to a storage table.
Implements
IntoSinkstructurally — no base class or registration required. The compiler detects it via structuralisinstance(node, IntoSink)and resolves its config fromstreaming.sinks.<name>.- Parameters:
payload (type[EventT]) – Concrete event type written by this sink.
table (str) – Target table name. May be overridden by the resolved config section or left empty when the config supplies it.
backend (Backend) – Storage backend. Defaults to
Backend.SQLALCHEMY.name (str) – Config section key (
streaming.sinks.<name>). Required when the sink is resolved fromConfigContext.router_branch_safe – Always
True— IntoTable may appear inside Router branches.
Example:
Process( Decompose(StoreEventExpander), Router({ StoreRow: Process( IntoTable(payload=StoreRow, table="store", name="store_sink") ), LanguageRow: Process( IntoTable(payload=LanguageRow, table="language", name="lang_sink") ), }), )
- build_partition(config, worker_index, worker_count, bridge=None, session_manager=None, logger=None)[source]¶
Build the per-worker partition for this sink.
- Parameters:
config (SqlAlchemySinkConfig | DeltaSinkConfig | ClickHouseSinkConfig) – Resolved
streaming.sinks.<name>config section.worker_index (int) – Zero-based index of the calling worker (unused).
worker_count (int) – Total number of workers (unused).
bridge (AsyncBridge | None) – Optional adapter-owned async bridge for SQLAlchemy sinks.
session_manager (_SessionManagerLike | None) – Optional adapter-owned SQLAlchemy session manager.
logger (Any)
- Returns:
A
SinkPartitionready to receivewrite_batchandclosecalls.- Raises:
TypeError – If the provided config does not match the selected backend.
RuntimeError – If required runtime resources are missing.
ValueError – If the resolved backend is not supported.
- Return type:
SinkPartition[EventT]
- class loom.streaming.IntoTopic(name, payload=None, shape=StreamShape.RECORD, partitioning=None, dlq=None)[source]¶
Bases:
LoomFrozenStruct,Generic[PayloadT]Declare a topic-based output boundary.
- Pattern:
Boundary node.
- Parameters:
name (str) – Logical output reference. By default this is also used as the Kafka config reference.
payload (type[PayloadT] | None) – Optional logical payload type descriptor.
shape (StreamShape) – Declared output shape.
partitioning (PartitionPolicy[PayloadT] | None) – Optional partitioning policy.
dlq (str | None) – Optional dead-letter topic name. When set, messages that fail delivery are routed to this topic instead of crashing the worker. The DLQ producer reuses the same broker settings as the main output and adds an
x-dlq-errorheader with the failure reason.
- property logical_ref: LogicalRef¶
Logical output reference.
- class loom.streaming.Message(*, payload, meta)[source]¶
Bases:
LoomFrozenStruct,Generic[PayloadT]Logical typed streaming event.
- Parameters:
payload (PayloadT) – Typed event payload.
meta (MessageMeta) – Transport-neutral metadata.
- class loom.streaming.MessageMeta(*, message_id, correlation_id=None, trace_id=None, parent_trace_id=None, causation_id=None, produced_at_ms=None, message_type=None, message_version=None, topic=None, partition=None, offset=None, key=None, headers=<factory>)[source]¶
Bases:
LoomFrozenStructTransport-neutral metadata for one logical event.
- Parameters:
message_id (str) – Stable event identifier.
correlation_id (str | None) – Optional correlation identifier.
trace_id (str | None) – Optional trace identifier.
parent_trace_id (str | None) – Optional upstream trace identifier.
causation_id (str | None) – Optional upstream event identifier.
produced_at_ms (int | None) – Optional original producer timestamp in epoch milliseconds.
message_type (str | None) – Optional logical message contract name.
message_version (int | None) – Optional logical message contract version.
topic (str | None) – Source topic name when available.
partition (int | None) – Source partition when available.
offset (int | None) – Source offset when available.
key (bytes | str | None) – Optional transport key when available.
- class loom.streaming.MongoConfig(*, source=None, sources=<factory>)[source]¶
Bases:
LoomFrozenStructTop-level MongoDB settings loaded from the
mongoconfig section.- Parameters:
source (MongoSourceConfig | None)
sources (dict[str, MongoSourceConfig])
- class loom.streaming.MongoBsonTimestamp(seconds, increment)[source]¶
Bases:
LoomFrozenStructCanonical Loom-safe representation of a BSON timestamp.
- class loom.streaming.MongoCDCEvent(*, event_id, operation_type, namespace, resume_token, document_id=None, cluster_time=None, wall_time_ms=None, lag_ms=None, full_document=None, update_description=None, raw_json)[source]¶
Bases:
LoomFrozenStructNormalized MongoDB CDC payload safe for the Loom pipeline.
- Parameters:
event_id (str)
operation_type (str)
namespace (MongoCDCNamespace)
document_id (MongoObjectId | str | None)
cluster_time (MongoBsonTimestamp | None)
wall_time_ms (int | None)
lag_ms (int | None)
raw_json (str)
- class loom.streaming.MongoCDCNamespace(db, coll=None)[source]¶
Bases:
LoomFrozenStructMongoDB namespace carried by a change event.
- class loom.streaming.MongoDBRef(*, id, collection, database=None)[source]¶
Bases:
LoomFrozenStructStructured representation of a MongoDB DBRef (cross-collection reference).
Carries the minimal information needed to resolve a reference without coupling domain code to the BSON DBRef wire format or pymongo types.
- class loom.streaming.MongoObjectId(*, id, created_at_ms=None)[source]¶
Bases:
LoomFrozenStructStructured representation of a MongoDB ObjectId.
Decouples Loom domain code from pymongo by carrying only the information derivable from the 12-byte ObjectId value.
- class loom.streaming.MongoSourceConfig(*, uri, database, watch_options=<factory>, server_api_version=None, on_oplog_expired='fail')[source]¶
Bases:
LoomFrozenStructConnection settings for one MongoDB CDC source.
- loom.streaming.build_mongo_cdc_event(change)[source]¶
Build a Loom-safe Mongo CDC event from one raw change-stream document.
- Parameters:
- Return type:
- loom.streaming.build_mongo_cdc_message(change)[source]¶
Build a transport-neutral Loom message for one Mongo change event.
- Parameters:
- Return type:
- loom.streaming.normalize_bson_value(value, _depth=0)[source]¶
Normalize one MongoDB/BSON runtime value into Loom-safe builtins.
- class loom.streaming.PartitionGuarantee(value)[source]¶
Bases:
StrEnumDeclared affinity guarantee of a partitioning strategy.
- class loom.streaming.PartitionPolicy(strategy, guarantee, allow_repartition=False)[source]¶
Bases:
LoomFrozenStruct,Generic[PayloadT]Declarative partitioning policy for topic output.
- Pattern:
Boundary helper.
- Parameters:
strategy (PartitionStrategy[PayloadT]) – Partition-key strategy used for output records.
guarantee (PartitionGuarantee) – Declared affinity guarantee offered by the strategy.
allow_repartition (bool) – Whether the flow may override the incoming key.
- class loom.streaming.PartitionStrategy(*args, **kwargs)[source]¶
Bases:
Protocol[PayloadT]Compute the outgoing transport partition key for a message.
- class loom.streaming.Predicate(*args, **kwargs)[source]¶
Bases:
Protocol[PayloadT]Custom boolean route predicate.
- class loom.streaming.Process(*nodes)[source]¶
Bases:
Generic[InT,OutT]Ordered collection of streaming nodes.
- Parameters:
*nodes (ProcessNode) – One or more nodes to execute in order.
- class loom.streaming.RecordStep[source]¶
-
Streaming step that consumes and produces one record at a time.
Declare the subclass itself in a flow, not an instance. The compiler materializes the class during binding resolution.
- Pattern:
Record-shaped step.
Subclasses define
execute(self, message, **kwargs) -> OutTwith the explicit payload and dependency signature they need. The runtime uses duck typing so user code can express explicit dependencies without mypy forcing a single override shape.
- class loom.streaming.ResourceFactory(*args, **kwargs)[source]¶
Bases:
Protocol[ResourceT]Create and close step resources under runtime control.
- class loom.streaming.ResourceScope(value)[source]¶
Bases:
StrEnumContext-manager lifecycle for dependencies declared through
With.- Pattern:
Wrapper scope.
- class loom.streaming.Route(when, process)[source]¶
Bases:
LoomFrozenStruct,Generic[InT,OutT]One predicate route branch.
- Pattern:
Route declaration.
- Parameters:
when (EqExpr | NeExpr | GtExpr | GeExpr | LtExpr | LeExpr | InExpr | BetweenExpr | AndExpr | OrExpr | NotExpr | Predicate[InT]) – Predicate expression or custom predicate.
process (Process) – Process executed when the predicate matches.
- class loom.streaming.Router(*, selector=None, routes=None, predicate_routes=(), default=None)[source]¶
Bases:
Generic[InT,OutT]Declarative streaming router.
- Pattern:
In-place routing.
Use
by()for key-based dispatch andwhen()for ordered predicate dispatch. The router only declares graph shape; runtime execution belongs to backend adapters.- Parameters:
- class loom.streaming.Selector(*args, **kwargs)[source]¶
Bases:
Protocol[PayloadT]Custom route-key selector.
- class loom.streaming.SinkPartition(*args, **kwargs)[source]¶
Bases:
Protocol[EventT_contra]Per-worker write interface for a storage sink.
The adapter calls write_batch() once per Bytewax epoch and close() on shutdown. Implementations must be sync-safe — use AsyncBridge for storage targets that require async I/O (e.g. SQLAlchemy async sessions).
- Parameters:
EventT_contra – Contravariant event type this partition accepts.
- class loom.streaming.StreamFlow(name, source, process, output=None, errors=None)[source]¶
Bases:
Generic[InT,OutT]Complete streaming flow declaration.
- Parameters:
name (str) – Unique flow identifier.
source (FromTopic[InT] | FromMultiTypeTopic[InT] | FromMongoCDC[InT]) – Input topic declaration.
process (Process[InT, OutT]) – Ordered sequence of transformation nodes.
output (IntoTopic[OutT] | None) – Optional terminal output topic.
errors (ErrorRoutes) – Optional error-route declaration. Pass one
IntoTopicto route every error kind to the same topic, or useErrorRoutegroups / explicit mappings for more specific routing.
- property errors: Mapping[ErrorKind, IntoTopic[StreamPayload]]¶
Explicit error routes keyed by error kind.
- property source: FromTopic[InT] | FromMultiTypeTopic[InT] | FromMongoCDC[InT]¶
Input topic declaration.
- class loom.streaming.StreamShape(value)[source]¶
Bases:
StrEnumLogical data shape at a streaming graph edge.
- class loom.streaming.Step[source]¶
Bases:
Configurable,ABC,Generic[InT,OutT]Base class for declarative streaming steps.
- Pattern:
Declarative step.
- property log: LoggerPort¶
Structured logger bound to this step class and module.
- class loom.streaming.StepContext(*args, **kwargs)[source]¶
Bases:
Protocol[ResourceCoT]Execution context with explicit resource access.
- property resource: ResourceCoT¶
Worker-local resource available to the step.
- class loom.streaming.SqlAlchemyDatabaseConfig(url, echo=False, pool_pre_ping=True, pool_size=10, max_overflow=20, pool_timeout=30, pool_recycle=1800, connect_args=<factory>)[source]¶
Bases:
objectResolved SQLAlchemy connection settings for a shared database entry.
- Parameters:
url (str) – SQLAlchemy async URL.
echo (bool) – Enable SQL echo logging.
pool_pre_ping (bool) – Check pooled connections before checkout.
pool_size (int) – Base connection pool size.
max_overflow (int) – Additional overflow connections.
pool_timeout (int) – Seconds to wait for a pooled connection.
pool_recycle (int) – Seconds before recycling pooled connections.
connect_args (dict[str, object]) – Extra arguments passed to the DBAPI connect call.
- class loom.streaming.SqlAlchemySinkConfig(database, url, table, chunk_size, buffer_policy)[source]¶
Bases:
objectResolved
IntoTablesettings for the SQLAlchemy backend.- Parameters:
database (str) – Shared
database.<name>reference. Empty when using inlineurlresolution.url (str) – Inline SQLAlchemy URL fallback.
table (str) – Final target table name.
chunk_size (int) – Number of rows per bulk
executemanycall.buffer_policy (_TableBufferPolicy) – Common flush policy shared with other backends.
- class loom.streaming.WindowStrategy(value)[source]¶
Bases:
StrEnumWindowing strategy for batch collection.
Determines how records are grouped into batches at runtime.
- Values:
- COLLECT: Count-and-timeout collect using processing time. Suitable
for testing and low-volume flows. Produces batches of up to
max_recordsitems or aftertimeout_msmilliseconds, whichever comes first.- TUMBLING: Fixed-length event-time tumbling window driven by
MessageMeta.produced_at_ms. Not yet implemented.- SESSION: Event-time session window grouped by inactivity gap driven
by
MessageMeta.produced_at_ms. Not yet implemented.
Note
COLLECTis the only strategy available in the current adapter.TUMBLINGandSESSIONare forward-declared and will raiseloom.streaming.compiler._compiler.CompilationErroruntil implemented.
- class loom.streaming.With(scope=ResourceScope.WORKER, *, process, **dependencies)[source]¶
Bases:
_WithBase[InT,OutT]Declare a sync dependency scope around an inner process.
- Pattern:
Sync wrapper.
Each incoming message flows through the inner
Processsynchronously. If the last node of the inner process is anIntoTopic, results are written directly to Kafka as each message completes — noForEachor outerIntoTopicis required. The outer stream is drained after this node (StreamShape.NONE).- Parameters:
scope (ResourceScope) – Lifecycle used when opening context-manager dependencies.
process (Process[Any, Any]) – Inner process executed per message.
**dependencies (object) – Named sync context managers, factories, or plain values injected into step execution.
- Raises:
TypeError – If an async context manager is passed directly.
Example:
With( process=Process(ValidateOrder(), IntoTopic("validated", payload=Validated)), scope=ResourceScope.WORKER, db=SessionLocal(), )
- class loom.streaming.WithAsync(scope=ResourceScope.WORKER, max_concurrency=10, task_timeout_ms=None, *, process, **dependencies)[source]¶
Bases:
_WithBase[InT,OutT]Declare an async dependency scope around an inner process.
- Pattern:
Async wrapper.
Each incoming message flows through the inner
Processasynchronously. If the last node of the inner process is anIntoTopic, results are written directly to Kafka as each message completes — noForEachor outerIntoTopicis required. The outer stream is drained after this node (StreamShape.NONE).- Parameters:
scope (ResourceScope) – Lifecycle used when opening context-manager dependencies.
max_concurrency (int) – Maximum concurrent message executions.
task_timeout_ms (int | None) – Optional per-message wall-clock deadline in milliseconds. When set, each message execution is cancelled with
TimeoutErrorif it exceeds the deadline.process (Process[Any, Any]) – Inner process executed per message.
**dependencies (object) – Named async context managers, factories, or plain values injected into step execution.
- Raises:
TypeError – If a sync context manager is passed directly.
ValueError – If max_concurrency is not positive.
ValueError – If task_timeout_ms is not positive when provided.
Example:
WithAsync( process=Process(FetchTask(), IntoTopic("results", payload=Result)), scope=ResourceScope.WORKER, max_concurrency=50, http=ContextFactory(lambda: httpx.AsyncClient()), )
- loom.streaming.compile_flow(flow, *, config)[source]¶
Compile a flow into an immutable plan.
- Parameters:
flow (StreamFlow[Any, Any]) – User-declared streaming flow.
config (ConfigContext | Mapping[str, Any]) – Canonical runtime config context used for binding resolution and Kafka settings extraction.
- Returns:
Immutable
CompiledPlanready for adapter wiring.- Raises:
CompilationError – If binding resolution or any validation phase fails.
- Return type: