loom.streaming.testing¶
Streaming testing helpers built on top of Bytewax testing sinks and sources.
Public API¶
StreamingTestRunner— test-oriented runner with injectable input, captured output, and explicit error branch capture.
Typical usage:
runner = StreamingTestRunner.from_flow(flow, config=cfg)
runner.with_payloads([OrderPlaced(order_id="o-1")])
runner.capture_errors(ErrorKind.WIRE)
runner.run()
assert len(runner.output) == 1
Functions
|
Normalize supported config inputs to a |
|
Return the first source topic when the compiled source exposes one. |
|
Classes
|
Run a streaming flow with test doubles for input and output. |
- class loom.streaming.testing.StreamingTestRunner(plan, observability_runtime=None)[source]¶
Bases:
objectRun a streaming flow with test doubles for input and output.
The runner mirrors the production runner’s dataflow preparation and swaps only the source and sinks for Bytewax testing primitives. Use
with_payloads()for ergonomic flow-author tests andwith_messages()when metadata must be controlled explicitly.- Parameters:
plan (CompiledPlan) – Compiled plan produced by the compiler.
observer – Optional flow observer.
observability_runtime (ObservabilityRuntime | None)
- classmethod from_flow(flow, *, config, observability_runtime=None)[source]¶
Compile a flow and build a test runner from resolved config.
- Parameters:
flow (StreamFlow[Any, Any])
config (ConfigContext | Mapping[str, Any])
observability_runtime (ObservabilityRuntime | None)
- Return type:
- classmethod from_yaml(flow, path, *, observability_runtime=None)[source]¶
Load YAML config, compile the flow, and build a test runner.
- Parameters:
flow (StreamFlow[Any, Any])
path (str)
observability_runtime (ObservabilityRuntime | None)
- Return type:
- classmethod from_dict(flow, config, *, observability_runtime=None)[source]¶
Build a test runner from a plain Python config mapping.
- Parameters:
- Return type:
- with_payloads(items)[source]¶
Replace input with payload-derived test messages.
Payloads are wrapped into
loom.streaming.Messagevalues using deterministic test metadata:message_id:test-<index>topic: first source topic declared by the flow
- Parameters:
- Return type:
- with_messages(items)[source]¶
Replace input with fully formed runtime messages or raw test records.
- Parameters:
- Return type:
- capture_errors(*kinds)[source]¶
Enable capture for explicit error branches.
- Parameters:
kinds (ErrorKind)
- Return type: