loom.streaming

Loom Streaming — topic-oriented declarations and transport adapters.

Public authoring API

Use loom.streaming for user-facing flow declarations:

from loom.streaming import (
    CollectBatch,
    ErrorKind,
    FromTopic,
    IntoTopic,
    Message,
    StreamShape,
)

Kafka-specific codecs, clients, and transport settings live under loom.streaming.kafka.

class loom.streaming.Backend(value)[source]

Bases: StrEnum

Storage backend used by IntoTable to write epoch batches.

Parameters:
  • SQLALCHEMY – Bulk-insert rows via SQLAlchemy Core. Transport-agnostic — works with any SA-supported database.

  • DELTA – Stage parquet parts and append them to a Delta Lake table. Requires deltalake and polars to be installed.

  • CLICKHOUSE – Bulk-insert rows via clickhouse-connect (native HTTP/2 protocol, no SQLAlchemy). Requires clickhouse-connect to be installed. Supports Array, Nullable and all ClickHouse-native types without JSON serialisation workarounds.

class loom.streaming.BatchExpandStep[source]

Bases: Step[InT, OutT], ABC

Streaming step that expands one batch into many output messages.

Declare the subclass itself in a flow, not an instance. The compiler materializes the class during binding resolution.

Pattern:

Batch fan-out step.

Subclasses define execute(self, messages, **kwargs) -> Iterable[OutT] with the explicit batch and dependency signature they need.

class loom.streaming.BatchStep[source]

Bases: Step[InT, OutT], ABC

Streaming step that consumes and produces one batch at a time.

Declare the subclass itself in a flow, not an instance. The compiler materializes the class during binding resolution.

Pattern:

Batch-shaped step.

Subclasses define execute(self, messages, **kwargs) -> OutT with the explicit batch and dependency signature they need.

class loom.streaming.Broadcast(*routes)[source]

Bases: Generic[InT]

Terminal fan-out node that delivers every message to all branches simultaneously.

Pattern:

Inclusive fan-out.

Unlike Fork, which routes each message to exactly one branch, Broadcast copies the message to every declared branch. All branches are independent: they may apply different transformations and write to different output topics.

Broadcast is terminal — no process nodes may follow it.

Parameters:

*routes (BroadcastRoute[InT, Any]) – One or more BroadcastRoute declarations.

Raises:

ValueError – If no routes are provided.

Example:

Broadcast(
    BroadcastRoute(
        process=Process(RecordStep(to_analytics_event)),
        output=IntoTopic("events.analytics", payload=AnalyticsEvent),
    ),
    BroadcastRoute(
        process=Process(RecordStep(to_fulfillment_order)),
        output=IntoTopic("orders.fulfillment", payload=FulfillmentOrder),
    ),
)
property routes: tuple[BroadcastRoute[InT, Any], ...]

Ordered fan-out branches.

class loom.streaming.BroadcastRoute(process, output=None)[source]

Bases: LoomFrozenStruct, Generic[InT, OutT]

One branch of a Broadcast node.

Pattern:

Fan-out branch.

Parameters:
  • process (Process) – Transformation nodes applied to every incoming message on this branch.

  • output (IntoTopic[OutT] | None) – Optional terminal Kafka topic that receives the transformed messages. When omitted, the branch acts as a discard branch after its inner process completes.

Example:

BroadcastRoute(
    process=Process(RecordStep(to_analytics_event)),
    output=IntoTopic("events.analytics", payload=AnalyticsEvent),
)
class loom.streaming.CollectBatch(max_records, timeout_ms, window=WindowStrategy.COLLECT, event_time=False)[source]

Bases: LoomFrozenStruct

Explicit shape adapter from record to batch.

Pattern:

Shape adapter.

Groups individual records into batches before handing them to downstream batch-aware nodes. Batch is an optimization grouping, not a first-class semantic unit — downstream nodes receive and return individual records after the batch step executes.

Parameters:
  • max_records (int) – Maximum records collected into one batch.

  • timeout_ms (int) – Maximum wait time before materializing a partial batch.

  • window (WindowStrategy) – Windowing strategy to use for grouping. Defaults to WindowStrategy.COLLECT (processing-time count-and-timeout).

  • event_time (bool) – When True, use MessageMeta.produced_at_ms as the event clock instead of wall time. Requires a windowing strategy other than COLLECT.

Raises:

ValueError – If max_records or timeout_ms are below one, or if event_time=True is combined with WindowStrategy.COLLECT.

Example

Collect up to 100 records or wait 500 ms:

CollectBatch(max_records=100, timeout_ms=500)
class loom.streaming.ContextFactory(factory)[source]

Bases: Configurable

Factory that creates a fresh context manager on demand.

Pattern:

Wrapper dependency.

Use this when you need a new context-manager instance per batch (scope=BATCH) or when you want to configure the factory from YAML.

Parameters:

factory (Callable[[], ContextDependency]) – Callable that returns a context-manager instance.

Example:

db = ContextFactory(lambda: SessionLocal())
With(process=Process(ValidateOrder, IntoTopic("validated")), scope=BATCH, db=db)
create()[source]

Return a fresh context-manager instance.

Return type:

AbstractContextManager[object] | AbstractAsyncContextManager[object]

property log: LoggerPort

Structured logger bound to this factory class and module.

class loom.streaming.DeltaSinkConfig(uri, table, mode, storage_options, partition_by, target_file_size, staging_compression, spool_max_bytes, part_max_records, writer_properties, buffer_policy, mini_batch_size=500)[source]

Bases: object

Resolved IntoTable settings for the Delta backend.

Parameters:
  • uri (str) – Delta table URI.

  • table (str) – Final target table name.

  • mode (Literal['error', 'append', 'ignore']) – delta-rs write mode.

  • storage_options (dict[str, str]) – Object-store credentials/options.

  • partition_by (tuple[str, ...]) – Column names used to partition the Delta table.

  • target_file_size (int | None) – Optional delta-rs target file size in bytes.

  • staging_compression (Literal['uncompressed', 'lz4', 'zstd']) – Compression codec for the IPC staging stream.

  • spool_max_bytes (int) – Maximum in-memory bytes before spooling the staging file to disk.

  • part_max_records (int) – Backward-compatible row-group hint for the final Delta write.

  • writer_properties (Any | None) – Final Delta writer properties passed to delta-rs.

  • buffer_policy (_TableBufferPolicy) – Common flush policy shared with other backends.

  • mini_batch_size (int)

classmethod from_config(config, *, default_table)[source]

Build Delta sink settings from a resolved sink config.

Parameters:
Return type:

Self

class loom.streaming.ExpandRoutes(*, expander, routes, default=None)[source]

Bases: Generic[InT]

Decompose one event into typed sub-collections and route each to its own Process.

Unlike Broadcast (which copies the same message to every route), ExpandRoutes calls expander.expand() exactly once per event and routes each output type’s rows to the matching Process. Zero redundant reprocessing.

ExpandRoutes is terminal — no process nodes may follow it in the parent Process.

Parameters:
  • expander (type[PayloadExpander[InT]]) – PayloadExpander subclass that produces typed sub-collections.

  • routes (dict[type, Process[Any, Any]]) – Mapping from output type to the Process that handles those rows.

  • default (Process[Any, Any] | None) – Optional fallback Process for types not declared in routes.

Raises:

ValueError – If routes is empty and default is None.

property expander: type[PayloadExpander[InT]]

PayloadExpander subclass used to decompose each event.

property routes: Mapping[type, Process]

Mapping from output type to its handling Process.

property default: Process | None

Fallback Process for types not declared in routes.

class loom.streaming.Explode(exploder)[source]

Bases: Generic[InT]

Transformation step that fans one typed event into N typed sub-events.

Explode sits upstream of a Router and has no knowledge of downstream sinks. The Router dispatches each emitted type to the matching branch — the same mechanism used for any multi-type stream. This keeps expanders and sinks fully decoupled.

Can appear in Router branches when nested expansion is needed (e.g. OrderEvent → OrderLineEvent → OrderLineRow).

Parameters:

exploder (type[PayloadExpander[InT]]) – Class implementing PayloadExpander[InT].

Example:

Process(
    Explode(StoreEventExpander),
    Router({
        StoreRow: Process(IntoTopic("stores.rows", payload=StoreRow)),
        LanguageRow: Process(IntoTopic("stores.languages", payload=LanguageRow)),
    }),
)
class loom.streaming.Drain[source]

Bases: LoomFrozenStruct

Explicit terminal adapter from any shape to none.

Pattern:

Shape adapter.

class loom.streaming.PayloadExpander(*args, **kwargs)[source]

Bases: Protocol[PayloadT_contra]

Expand one typed stream event into typed sub-event collections.

Each key in the returned dict is the concrete type that the downstream Router uses to dispatch to the correct branch — type-keyed, not string-keyed. The expand() method must be pure: no I/O, no side effects, no knowledge of downstream sinks or storage backends.

Declare outputs to allow the compiler to validate that every produced type has a matching Router branch.

Example:

class StoreEventExpander(PayloadExpander[StoreEvent]):
    outputs: ClassVar[tuple[type, ...]] = (StoreRow, LanguageRow)

    @classmethod
    def expand(cls, event: StoreEvent) -> dict[type, list[Any]]:
        return {
            StoreRow: [StoreRow(id=event.store_id, name=event.name)],
            LanguageRow: [
                LanguageRow(code=code) for code in event.language_codes
            ],
        }
Parameters:

PayloadT_contra – Contravariant type of the incoming stream event.

classmethod expand(event)[source]

Expand one event into typed sub-event collections.

Parameters:

event (PayloadT_contra) – Incoming stream event to expand.

Returns:

Mapping from output type to list of event instances of that type. Every type in outputs must appear as a key.

Return type:

dict[type, list[Any]]

class loom.streaming.ErrorEnvelope(kind, reason, payload_type=None, original_message=None)[source]

Bases: LoomFrozenStruct, Generic[PayloadT]

Structured error payload routed through explicit error branches.

Parameters:
  • kind (ErrorKind) – Logical error category.

  • reason (str) – Human-readable reason.

  • payload_type (str | None) – The message_type of the original payload, mirrored from original_message.meta.message_type for efficient top-level dispatch without nested decode.

  • original_message (ErrorMessage[PayloadT] | None) – Wire-safe snapshot of the original message when available.

class loom.streaming.ErrorKind(value)[source]

Bases: StrEnum

Logical categories for streaming flow errors.

class loom.streaming.ErrorMessage(*, payload, meta)[source]

Bases: LoomFrozenStruct, Generic[PayloadT]

Wire-safe snapshot of the original message carried by an error.

Parameters:
  • payload (PayloadT) – Original domain payload.

  • meta (ErrorMessageMeta) – Snapshot of the original message metadata.

class loom.streaming.ErrorMessageMeta(*, message_id, correlation_id=None, trace_id=None, parent_trace_id=None, causation_id=None, produced_at_ms=None, message_type=None, message_version=None, topic=None, partition=None, offset=None, key=None, headers=<factory>)[source]

Bases: LoomFrozenStruct

Wire-safe snapshot of the original message metadata.

Parameters:
  • message_id (str) – Stable event identifier.

  • correlation_id (str | None) – Optional correlation identifier.

  • trace_id (str | None) – Optional trace identifier.

  • parent_trace_id (str | None) – Optional upstream trace identifier.

  • causation_id (str | None) – Optional upstream event identifier.

  • produced_at_ms (int | None) – Optional original producer timestamp in epoch milliseconds.

  • message_type (str | None) – Optional logical message contract name.

  • message_version (int | None) – Optional logical message contract version.

  • topic (str | None) – Source topic name when available.

  • partition (int | None) – Source partition when available.

  • offset (int | None) – Source offset when available.

  • key (bytes | None) – Optional transport key when available. Stored as bytes to keep the error envelope msgspec-compatible when nested inside Kafka payloads.

  • headers (dict[str, bytes]) – Opaque transport headers.

class loom.streaming.ErrorRoute(kinds, output)[source]

Bases: LoomFrozenStruct

Route a group of error kinds to one output topic.

Parameters:
  • kinds (tuple[ErrorKind, ...]) – Error kinds handled by this route.

  • output (IntoTopic[StreamPayload]) – Kafka boundary that receives the routed envelopes.

class loom.streaming.ExpandStep[source]

Bases: Step[InT, OutT], ABC

Streaming step that expands one record into many output messages.

Declare the subclass itself in a flow, not an instance. The compiler materializes the class during binding resolution.

Pattern:

Record fan-out step.

Subclasses define execute(self, message, **kwargs) -> Iterable[OutT] with the explicit payload and dependency signature they need.

class loom.streaming.ForEach[source]

Bases: LoomFrozenStruct

Explicit shape adapter from batch to record.

Pattern:

Shape adapter.

Note

This is a shape-level adapter only. It flattens a batch stream back to individual records so downstream RecordStep nodes can consume them. It carries no business semantics.

class loom.streaming.Fork(*, kind, selector=None, routes=None, predicate_routes=(), default=None)[source]

Bases: Generic[InT]

Terminal graph-level routing node that physically splits the stream.

Pattern:

Terminal branching.

Unlike loom.streaming.nodes.Router, branches are full independent sub-graphs. Each branch may contain any process node, including With and WithAsync. Fork is terminal in the parent process: no nodes may follow it.

Parameters:
  • kind (ForkKind)

  • selector (SelectorSpec[InT] | None)

  • routes (Mapping[object, Process[InT, Any]] | None)

  • predicate_routes (Sequence[ForkRoute[InT]])

  • default (Process[InT, Any] | None)

classmethod by(selector, branches, *, default=None)[source]

Declare key-based terminal branching.

Parameters:
  • selector (PathRef | Selector[InT]) – Path expression or custom selector.

  • branches (Mapping[object, Process]) – Terminal processes keyed by selector result.

  • default (Process | None) – Optional fallback process.

Returns:

Fork declaration.

Return type:

Fork[InT]

classmethod when(routes, *, default=None)[source]

Declare ordered first-match terminal branching.

Parameters:
  • routes (Sequence[ForkRoute[InT]]) – Ordered predicate routes.

  • default (Process | None) – Optional fallback process.

Returns:

Fork declaration.

Return type:

Fork[InT]

property kind: ForkKind

Routing family for this fork.

property selector: PathRef | Selector[InT] | None

Key selector for Fork.by branches.

property routes: Mapping[object, Process]

Keyed branches for Fork.by.

property predicate_routes: tuple[ForkRoute[InT], ...]

Ordered predicate routes for Fork.when.

property default: Process | None

Fallback process.

class loom.streaming.ForkRoute(when, process)[source]

Bases: LoomFrozenStruct, Generic[InT]

One predicate branch of a Fork node.

Pattern:

Branch declaration.

Parameters:
  • when (EqExpr | NeExpr | GtExpr | GeExpr | LtExpr | LeExpr | InExpr | BetweenExpr | AndExpr | OrExpr | NotExpr | Predicate[InT]) – Predicate expression or custom predicate.

  • process (Process) – Terminal process executed when the predicate matches.

class loom.streaming.FromMongoCDC(name, collections=(), watch_options=<factory>, shape=StreamShape.RECORD)[source]

Bases: LoomFrozenStruct, Generic[MongoCDCPayloadT]

Declare a MongoDB CDC stream as a streaming input boundary.

The type parameter MongoCDCPayloadT identifies the payload type produced by this source. It defaults to MongoCDCEvent so callers that do not need to override the payload type can omit it.

Parameters:
  • name (str) – Logical input reference used to resolve Mongo source config.

  • collections (tuple[str, ...]) – Collection names to watch. An empty tuple means a database-level watch.

  • watch_options (Mapping[str, object]) – Driver watch kwargs resolved later by compiler/runtime.

  • shape (StreamShape) – Declared source shape.

property logical_ref: LogicalRef

Return the logical input reference.

class loom.streaming.FromMultiTypeTopic(name, payloads, shape=StreamShape.RECORD)[source]

Bases: LoomFrozenStruct, Generic[MultiPayloadT]

Declare a heterogeneous topic-based input boundary.

Reads from a single Kafka topic that carries multiple payload types. The runtime dispatches each record to the correct decoder using MessageEnvelope.meta.descriptor.message_type for plain payloads, ErrorEnvelope.payload_type for routed error envelopes, and the dedicated wire error message type for DecodeError payloads.

Each type in payloads uses __loom_message_type__ when present and otherwise falls back to the fully qualified Python name f"{type.__module__}.{type.__qualname__}" so the compiler can build the dispatch table at compile time without requiring extra boilerplate. DecodeError is a special-case wire payload with its own explicit message type.

Parameters:
Raises:

ValueError – When fewer than two payload types are declared.

Example

source = FromMultiTypeTopic(
    "events.all",
    payloads=(OrderCreated, OrderCancelled),
)
property logical_ref: LogicalRef

Logical input reference.

class loom.streaming.FromTopic(name, payload, shape=StreamShape.RECORD)[source]

Bases: LoomFrozenStruct, Generic[PayloadT]

Declare a topic-based input boundary.

Pattern:

Boundary node.

Parameters:
  • name (str) – Logical input reference. By default this is also used as the Kafka config reference.

  • payload (type[PayloadT]) – Logical payload type descriptor.

  • shape (StreamShape) – Declared source shape.

property logical_ref: LogicalRef

Logical input reference.

class loom.streaming.IntoSink(*args, **kwargs)[source]

Bases: Protocol[EventT]

Structural protocol satisfied by all Into* terminal storage sink nodes.

The compiler detects nodes implementing this protocol through a structural isinstance check and resolves their config from streaming.sinks.<name>. The adapter calls build_partition() once per Bytewax worker to obtain the SinkPartition that handles epoch writes and shutdown.

Implementing a custom Into* node requires no framework imports, no registration, and no base class — any frozen dataclass with the four declared members satisfies this protocol and is treated identically to built-in nodes such as IntoTable.

Example:

@dataclass(frozen=True)
class IntoMongo(Generic[EventT]):
    payload: type[EventT]
    collection: str
    name: str = ""
    router_branch_safe: ClassVar[bool] = True

    def build_partition(
        self,
        config,
        worker_index,
        worker_count,
        bridge=None,
        session_manager=None,
    ):
        return MongoPartition(url=config["url"], collection=self.collection)
Parameters:

EventT_contra – Contravariant event type this sink accepts.

build_partition(config, worker_index, worker_count, bridge=None, session_manager=None)[source]

Build the per-worker partition for this sink.

Called once per Bytewax worker at startup. The returned SinkPartition is owned by that worker for the lifetime of the run.

Parameters:
  • config (object) – Resolved streaming.sinks.<name> config section.

  • worker_index (int) – Zero-based index of the worker calling this method.

  • worker_count (int) – Total number of workers in this run.

  • bridge (AsyncBridge | None) – Optional async bridge provided by the Bytewax runtime.

  • session_manager (object | None) – Optional adapter-owned backend resource for async SQLAlchemy sinks.

Returns:

A SinkPartition that will handle write_batch() and close() calls.

Return type:

SinkPartition[EventT]

class loom.streaming.IntoTable(payload, table='', backend=Backend.SQLALCHEMY, name='')[source]

Bases: Generic[EventT]

Streaming terminal node that writes typed events to a storage table.

Implements IntoSink structurally — no base class or registration required. The compiler detects it via structural isinstance(node, IntoSink) and resolves its config from streaming.sinks.<name>.

Parameters:
  • payload (type[EventT]) – Concrete event type written by this sink.

  • table (str) – Target table name. May be overridden by the resolved config section or left empty when the config supplies it.

  • backend (Backend) – Storage backend. Defaults to Backend.SQLALCHEMY.

  • name (str) – Config section key (streaming.sinks.<name>). Required when the sink is resolved from ConfigContext.

  • router_branch_safe – Always True — IntoTable may appear inside Router branches.

Example:

Process(
    Decompose(StoreEventExpander),
    Router({
        StoreRow:    Process(
            IntoTable(payload=StoreRow, table="store", name="store_sink")
        ),
        LanguageRow: Process(
            IntoTable(payload=LanguageRow, table="language", name="lang_sink")
        ),
    }),
)
build_partition(config, worker_index, worker_count, bridge=None, session_manager=None, logger=None)[source]

Build the per-worker partition for this sink.

Parameters:
  • config (SqlAlchemySinkConfig | DeltaSinkConfig | ClickHouseSinkConfig) – Resolved streaming.sinks.<name> config section.

  • worker_index (int) – Zero-based index of the calling worker (unused).

  • worker_count (int) – Total number of workers (unused).

  • bridge (AsyncBridge | None) – Optional adapter-owned async bridge for SQLAlchemy sinks.

  • session_manager (_SessionManagerLike | None) – Optional adapter-owned SQLAlchemy session manager.

  • logger (Any)

Returns:

A SinkPartition ready to receive write_batch and close calls.

Raises:
  • TypeError – If the provided config does not match the selected backend.

  • RuntimeError – If required runtime resources are missing.

  • ValueError – If the resolved backend is not supported.

Return type:

SinkPartition[EventT]

class loom.streaming.IntoTopic(name, payload=None, shape=StreamShape.RECORD, partitioning=None, dlq=None)[source]

Bases: LoomFrozenStruct, Generic[PayloadT]

Declare a topic-based output boundary.

Pattern:

Boundary node.

Parameters:
  • name (str) – Logical output reference. By default this is also used as the Kafka config reference.

  • payload (type[PayloadT] | None) – Optional logical payload type descriptor.

  • shape (StreamShape) – Declared output shape.

  • partitioning (PartitionPolicy[PayloadT] | None) – Optional partitioning policy.

  • dlq (str | None) – Optional dead-letter topic name. When set, messages that fail delivery are routed to this topic instead of crashing the worker. The DLQ producer reuses the same broker settings as the main output and adds an x-dlq-error header with the failure reason.

property logical_ref: LogicalRef

Logical output reference.

class loom.streaming.Message(*, payload, meta)[source]

Bases: LoomFrozenStruct, Generic[PayloadT]

Logical typed streaming event.

Parameters:
  • payload (PayloadT) – Typed event payload.

  • meta (MessageMeta) – Transport-neutral metadata.

class loom.streaming.MessageMeta(*, message_id, correlation_id=None, trace_id=None, parent_trace_id=None, causation_id=None, produced_at_ms=None, message_type=None, message_version=None, topic=None, partition=None, offset=None, key=None, headers=<factory>)[source]

Bases: LoomFrozenStruct

Transport-neutral metadata for one logical event.

Parameters:
  • message_id (str) – Stable event identifier.

  • correlation_id (str | None) – Optional correlation identifier.

  • trace_id (str | None) – Optional trace identifier.

  • parent_trace_id (str | None) – Optional upstream trace identifier.

  • causation_id (str | None) – Optional upstream event identifier.

  • produced_at_ms (int | None) – Optional original producer timestamp in epoch milliseconds.

  • message_type (str | None) – Optional logical message contract name.

  • message_version (int | None) – Optional logical message contract version.

  • topic (str | None) – Source topic name when available.

  • partition (int | None) – Source partition when available.

  • offset (int | None) – Source offset when available.

  • key (bytes | str | None) – Optional transport key when available.

  • headers (dict[str, bytes]) – Opaque transport headers.

class loom.streaming.MongoConfig(*, source=None, sources=<factory>)[source]

Bases: LoomFrozenStruct

Top-level MongoDB settings loaded from the mongo config section.

Parameters:
source_for(ref)[source]

Resolve source settings by logical reference with default fallback.

Parameters:

ref (str | LogicalRef)

Return type:

MongoSourceConfig

class loom.streaming.MongoBsonTimestamp(seconds, increment)[source]

Bases: LoomFrozenStruct

Canonical Loom-safe representation of a BSON timestamp.

Parameters:
  • seconds (int)

  • increment (int)

class loom.streaming.MongoCDCEvent(*, event_id, operation_type, namespace, resume_token, document_id=None, cluster_time=None, wall_time_ms=None, lag_ms=None, full_document=None, update_description=None, raw_json)[source]

Bases: LoomFrozenStruct

Normalized MongoDB CDC payload safe for the Loom pipeline.

Parameters:
class loom.streaming.MongoCDCNamespace(db, coll=None)[source]

Bases: LoomFrozenStruct

MongoDB namespace carried by a change event.

Parameters:
class loom.streaming.MongoDBRef(*, id, collection, database=None)[source]

Bases: LoomFrozenStruct

Structured representation of a MongoDB DBRef (cross-collection reference).

Carries the minimal information needed to resolve a reference without coupling domain code to the BSON DBRef wire format or pymongo types.

Parameters:
  • id (str)

  • collection (str)

  • database (str | None)

id

Lowercase hex string of the referenced document’s ObjectId.

Type:

str

collection

Name of the collection that holds the referenced document.

Type:

str

database

Name of the database, or None when the reference is local to the current database.

Type:

str | None

class loom.streaming.MongoObjectId(*, id, created_at_ms=None)[source]

Bases: LoomFrozenStruct

Structured representation of a MongoDB ObjectId.

Decouples Loom domain code from pymongo by carrying only the information derivable from the 12-byte ObjectId value.

Parameters:
  • id (str)

  • created_at_ms (int | None)

id

Lowercase hex string of the 24-character ObjectId.

Type:

str

created_at_ms

Creation epoch in milliseconds, extracted from the 4-byte Unix timestamp embedded in the ObjectId, or None when the timestamp is unavailable.

Type:

int | None

class loom.streaming.MongoSourceConfig(*, uri, database, watch_options=<factory>, server_api_version=None, on_oplog_expired='fail')[source]

Bases: LoomFrozenStruct

Connection settings for one MongoDB CDC source.

Parameters:
loom.streaming.build_mongo_cdc_event(change)[source]

Build a Loom-safe Mongo CDC event from one raw change-stream document.

Parameters:

change (Mapping[str, object])

Return type:

MongoCDCEvent

loom.streaming.build_mongo_cdc_message(change)[source]

Build a transport-neutral Loom message for one Mongo change event.

Parameters:

change (Mapping[str, object])

Return type:

Message[MongoCDCEvent]

loom.streaming.normalize_bson_value(value, _depth=0)[source]

Normalize one MongoDB/BSON runtime value into Loom-safe builtins.

Parameters:
Return type:

object

class loom.streaming.PartitionGuarantee(value)[source]

Bases: StrEnum

Declared affinity guarantee of a partitioning strategy.

class loom.streaming.PartitionPolicy(strategy, guarantee, allow_repartition=False)[source]

Bases: LoomFrozenStruct, Generic[PayloadT]

Declarative partitioning policy for topic output.

Pattern:

Boundary helper.

Parameters:
  • strategy (PartitionStrategy[PayloadT]) – Partition-key strategy used for output records.

  • guarantee (PartitionGuarantee) – Declared affinity guarantee offered by the strategy.

  • allow_repartition (bool) – Whether the flow may override the incoming key.

class loom.streaming.PartitionStrategy(*args, **kwargs)[source]

Bases: Protocol[PayloadT]

Compute the outgoing transport partition key for a message.

partition_key(message)[source]

Return the outgoing transport partition key.

Parameters:

message (Message[PayloadT])

Return type:

bytes | str | None

class loom.streaming.Predicate(*args, **kwargs)[source]

Bases: Protocol[PayloadT]

Custom boolean route predicate.

matches(message)[source]

Return whether message matches this predicate.

Parameters:

message (Message[PayloadT])

Return type:

bool

class loom.streaming.Process(*nodes)[source]

Bases: Generic[InT, OutT]

Ordered collection of streaming nodes.

Parameters:

*nodes (ProcessNode) – One or more nodes to execute in order.

property nodes: tuple[ConfigBinding | Step[Any, Any] | type[Step[Any, Any]] | With[Any, Any] | WithAsync[Any, Any] | CollectBatch | ForEach | Drain | IntoTopic[Any] | Fork[Any] | Router[Any, Any] | Broadcast[Any] | ExpandRoutes[Any], ...]

Immutable sequence of process nodes.

class loom.streaming.RecordStep[source]

Bases: Step[InT, OutT], ABC

Streaming step that consumes and produces one record at a time.

Declare the subclass itself in a flow, not an instance. The compiler materializes the class during binding resolution.

Pattern:

Record-shaped step.

Subclasses define execute(self, message, **kwargs) -> OutT with the explicit payload and dependency signature they need. The runtime uses duck typing so user code can express explicit dependencies without mypy forcing a single override shape.

class loom.streaming.ResourceFactory(*args, **kwargs)[source]

Bases: Protocol[ResourceT]

Create and close step resources under runtime control.

create()[source]

Create one worker-local resource.

Return type:

ResourceT

close(resource)[source]

Close one resource created by this factory.

Parameters:

resource (ResourceT)

Return type:

None

class loom.streaming.ResourceScope(value)[source]

Bases: StrEnum

Context-manager lifecycle for dependencies declared through With.

Pattern:

Wrapper scope.

class loom.streaming.Route(when, process)[source]

Bases: LoomFrozenStruct, Generic[InT, OutT]

One predicate route branch.

Pattern:

Route declaration.

Parameters:
  • when (EqExpr | NeExpr | GtExpr | GeExpr | LtExpr | LeExpr | InExpr | BetweenExpr | AndExpr | OrExpr | NotExpr | Predicate[InT]) – Predicate expression or custom predicate.

  • process (Process) – Process executed when the predicate matches.

class loom.streaming.Router(*, selector=None, routes=None, predicate_routes=(), default=None)[source]

Bases: Generic[InT, OutT]

Declarative streaming router.

Pattern:

In-place routing.

Use by() for key-based dispatch and when() for ordered predicate dispatch. The router only declares graph shape; runtime execution belongs to backend adapters.

Parameters:
  • selector (SelectorSpec[InT] | None)

  • routes (Mapping[object, Process[InT, OutT]] | None)

  • predicate_routes (Sequence[Route[InT, OutT]])

  • default (Process[InT, OutT] | None)

classmethod by(selector, routes, *, default=None)[source]

Declare key-based routing.

Parameters:
  • selector (PathRef | Selector[InT]) – Path expression or custom selector.

  • routes (Mapping[object, Process]) – Processes keyed by selector result.

  • default (Process | None) – Optional fallback process.

Returns:

Router declaration.

Return type:

Router[InT, OutT]

classmethod when(routes, *, default=None)[source]

Declare ordered first-match predicate routing.

Parameters:
  • routes (Sequence[Route[InT, OutT]]) – Ordered predicate routes.

  • default (Process | None) – Optional fallback process.

Returns:

Router declaration.

Return type:

Router[InT, OutT]

property selector: PathRef | Selector[InT] | None

Key selector for Router.by routes.

property routes: Mapping[object, Process]

Keyed routes for Router.by.

property predicate_routes: tuple[Route[InT, OutT], ...]

Ordered predicate routes for Router.when.

property default: Process | None

Fallback process.

class loom.streaming.Selector(*args, **kwargs)[source]

Bases: Protocol[PayloadT]

Custom route-key selector.

select(message)[source]

Return a route key for message.

Parameters:

message (Message[PayloadT])

Return type:

object

class loom.streaming.SinkPartition(*args, **kwargs)[source]

Bases: Protocol[EventT_contra]

Per-worker write interface for a storage sink.

The adapter calls write_batch() once per Bytewax epoch and close() on shutdown. Implementations must be sync-safe — use AsyncBridge for storage targets that require async I/O (e.g. SQLAlchemy async sessions).

Parameters:

EventT_contra – Contravariant event type this partition accepts.

write_batch(items)[source]

Write a batch of events for this epoch.

Parameters:

items (Sequence[EventT_contra]) – Events delivered by Bytewax for the current epoch.

Return type:

None

close()[source]

Release resources and flush any pending data.

Called once per worker on shutdown or after a failed run. Must be idempotent — the adapter may call close() even when write_batch() was never invoked.

Return type:

None

class loom.streaming.StreamFlow(name, source, process, output=None, errors=None)[source]

Bases: Generic[InT, OutT]

Complete streaming flow declaration.

Parameters:
  • name (str) – Unique flow identifier.

  • source (FromTopic[InT] | FromMultiTypeTopic[InT] | FromMongoCDC[InT]) – Input topic declaration.

  • process (Process[InT, OutT]) – Ordered sequence of transformation nodes.

  • output (IntoTopic[OutT] | None) – Optional terminal output topic.

  • errors (ErrorRoutes) – Optional error-route declaration. Pass one IntoTopic to route every error kind to the same topic, or use ErrorRoute groups / explicit mappings for more specific routing.

property errors: Mapping[ErrorKind, IntoTopic[StreamPayload]]

Explicit error routes keyed by error kind.

property name: str

Flow identifier.

property output: IntoTopic[OutT] | None

Terminal output topic, if any.

property process: Process[InT, OutT]

Ordered node sequence.

property source: FromTopic[InT] | FromMultiTypeTopic[InT] | FromMongoCDC[InT]

Input topic declaration.

class loom.streaming.StreamShape(value)[source]

Bases: StrEnum

Logical data shape at a streaming graph edge.

class loom.streaming.Step[source]

Bases: Configurable, ABC, Generic[InT, OutT]

Base class for declarative streaming steps.

Pattern:

Declarative step.

property log: LoggerPort

Structured logger bound to this step class and module.

classmethod step_name()[source]

Resolved step name for observability and validation errors.

Return type:

str

class loom.streaming.StepContext(*args, **kwargs)[source]

Bases: Protocol[ResourceCoT]

Execution context with explicit resource access.

property resource: ResourceCoT

Worker-local resource available to the step.

class loom.streaming.SqlAlchemyDatabaseConfig(url, echo=False, pool_pre_ping=True, pool_size=10, max_overflow=20, pool_timeout=30, pool_recycle=1800, connect_args=<factory>)[source]

Bases: object

Resolved SQLAlchemy connection settings for a shared database entry.

Parameters:
  • url (str) – SQLAlchemy async URL.

  • echo (bool) – Enable SQL echo logging.

  • pool_pre_ping (bool) – Check pooled connections before checkout.

  • pool_size (int) – Base connection pool size.

  • max_overflow (int) – Additional overflow connections.

  • pool_timeout (int) – Seconds to wait for a pooled connection.

  • pool_recycle (int) – Seconds before recycling pooled connections.

  • connect_args (dict[str, object]) – Extra arguments passed to the DBAPI connect call.

classmethod from_config(config)[source]

Build a SQLAlchemy connection config from a resolved YAML section.

Parameters:

config (Mapping[str, Any])

Return type:

Self

to_session_manager_config()[source]

Convert the config to SessionManager.from_config keyword mapping.

Return type:

dict[str, object]

class loom.streaming.SqlAlchemySinkConfig(database, url, table, chunk_size, buffer_policy)[source]

Bases: object

Resolved IntoTable settings for the SQLAlchemy backend.

Parameters:
  • database (str) – Shared database.<name> reference. Empty when using inline url resolution.

  • url (str) – Inline SQLAlchemy URL fallback.

  • table (str) – Final target table name.

  • chunk_size (int) – Number of rows per bulk executemany call.

  • buffer_policy (_TableBufferPolicy) – Common flush policy shared with other backends.

classmethod from_config(config, *, default_table)[source]

Build SQLAlchemy sink settings from a resolved sink config.

Parameters:
Return type:

Self

class loom.streaming.WindowStrategy(value)[source]

Bases: StrEnum

Windowing strategy for batch collection.

Determines how records are grouped into batches at runtime.

Values:
COLLECT: Count-and-timeout collect using processing time. Suitable

for testing and low-volume flows. Produces batches of up to max_records items or after timeout_ms milliseconds, whichever comes first.

TUMBLING: Fixed-length event-time tumbling window driven by

MessageMeta.produced_at_ms. Not yet implemented.

SESSION: Event-time session window grouped by inactivity gap driven

by MessageMeta.produced_at_ms. Not yet implemented.

Note

COLLECT is the only strategy available in the current adapter. TUMBLING and SESSION are forward-declared and will raise loom.streaming.compiler._compiler.CompilationError until implemented.

class loom.streaming.With(scope=ResourceScope.WORKER, *, process, **dependencies)[source]

Bases: _WithBase[InT, OutT]

Declare a sync dependency scope around an inner process.

Pattern:

Sync wrapper.

Each incoming message flows through the inner Process synchronously. If the last node of the inner process is an IntoTopic, results are written directly to Kafka as each message completes — no ForEach or outer IntoTopic is required. The outer stream is drained after this node (StreamShape.NONE).

Parameters:
  • scope (ResourceScope) – Lifecycle used when opening context-manager dependencies.

  • process (Process[Any, Any]) – Inner process executed per message.

  • **dependencies (object) – Named sync context managers, factories, or plain values injected into step execution.

Raises:

TypeError – If an async context manager is passed directly.

Example:

With(
    process=Process(ValidateOrder(), IntoTopic("validated", payload=Validated)),
    scope=ResourceScope.WORKER,
    db=SessionLocal(),
)
class loom.streaming.WithAsync(scope=ResourceScope.WORKER, max_concurrency=10, task_timeout_ms=None, *, process, **dependencies)[source]

Bases: _WithBase[InT, OutT]

Declare an async dependency scope around an inner process.

Pattern:

Async wrapper.

Each incoming message flows through the inner Process asynchronously. If the last node of the inner process is an IntoTopic, results are written directly to Kafka as each message completes — no ForEach or outer IntoTopic is required. The outer stream is drained after this node (StreamShape.NONE).

Parameters:
  • scope (ResourceScope) – Lifecycle used when opening context-manager dependencies.

  • max_concurrency (int) – Maximum concurrent message executions.

  • task_timeout_ms (int | None) – Optional per-message wall-clock deadline in milliseconds. When set, each message execution is cancelled with TimeoutError if it exceeds the deadline.

  • process (Process[Any, Any]) – Inner process executed per message.

  • **dependencies (object) – Named async context managers, factories, or plain values injected into step execution.

Raises:
  • TypeError – If a sync context manager is passed directly.

  • ValueError – If max_concurrency is not positive.

  • ValueError – If task_timeout_ms is not positive when provided.

Example:

WithAsync(
    process=Process(FetchTask(), IntoTopic("results", payload=Result)),
    scope=ResourceScope.WORKER,
    max_concurrency=50,
    http=ContextFactory(lambda: httpx.AsyncClient()),
)
loom.streaming.compile_flow(flow, *, config)[source]

Compile a flow into an immutable plan.

Parameters:
Returns:

Immutable CompiledPlan ready for adapter wiring.

Raises:

CompilationError – If binding resolution or any validation phase fails.

Return type:

CompiledPlan