loom.core.engine¶
- exception loom.core.engine.CompilationError[source]¶
Bases:
ExceptionRaised when a UseCase fails structural validation at startup.
- Parameters:
message – Human-readable description of the compilation failure.
- class loom.core.engine.ComputeStep(fn, accepts_context=False)[source]¶
Bases:
objectA compute transformation step.
Holds a direct reference to the
ComputeFnso the plan is self-contained and requires no back-reference to the UseCase class.
- class loom.core.engine.EventKind(value)[source]¶
Bases:
StrEnumDiscriminator for runtime and compile-time events.
Used by
MetricsAdapterand structured loggers to route events without isinstance checks.
- class loom.core.engine.ExecutionPlan(use_case_type, param_bindings, input_binding, load_steps, exists_steps, compute_steps, rule_steps, read_only=False)[source]¶
Bases:
objectImmutable compiled representation of a UseCase’s execution flow.
Built once at startup by
UseCaseCompilerand reused for every request. No dynamic reflection occurs after compilation.- Parameters:
use_case_type (type[Any]) – The
UseCasesubclass this plan was compiled from.param_bindings (tuple[ParamBinding, ...]) – Primitive parameters bound from the caller.
input_binding (InputBinding | None) – Command payload binding, or
Noneif absent.load_steps (tuple[LoadStep, ...]) – Entity prefetch steps, in declaration order.
exists_steps (tuple[ExistsStep, ...]) – Boolean existence checks, in declaration order.
compute_steps (tuple[ComputeStep, ...]) – Compute transformations, in declaration order.
rule_steps (tuple[RuleStep, ...]) – Rule validations, in declaration order.
read_only (bool) – When
True, the executor skips opening aUnitOfWorktransaction for this use case. Set automatically fromUseCase.read_onlyat compile time. The HTTP layer may also override this at the handler level (e.g. GET routes).
Example:
plan = ExecutionPlan( use_case_type=UpdateUserUseCase, param_bindings=(ParamBinding("user_id", int),), input_binding=InputBinding("cmd", UpdateUserCommand), load_steps=( LoadStep( "user", User, source_kind=SourceKind.PARAM, source_name="user_id", lookup_kind=LookupKind.BY_ID, against="id", ), ), exists_steps=(), compute_steps=(), rule_steps=(), )
- class loom.core.engine.InputBinding(name, command_type)[source]¶
Bases:
objectA command payload parameter marked with
Input().The executor builds the command from the raw payload dict and injects it under this parameter name.
- class loom.core.engine.LoadStep(name, entity_type, source_kind, source_name, lookup_kind, against, profile='default', on_missing=OnMissing.RAISE)[source]¶
Bases:
objectAn entity prefetch step marked with
LoadByIdorLoad.The executor resolves the entity from a repository before calling
execute. Missing behavior is controlled byon_missing.- Parameters:
name (str) – Parameter name as declared in the signature.
source_kind (SourceKind) – Where the lookup value is extracted from.
source_name (str) – Name of param/command field used as lookup value.
lookup_kind (LookupKind) – Lookup strategy (id or arbitrary field).
against (str) – Entity field used in repository lookup.
profile (str) – Loading profile forwarded to
repo.get_by_id. Defaults to"default".on_missing (OnMissing) – Policy when no entity is found.
- class loom.core.engine.MetricsAdapter(*args, **kwargs)[source]¶
Bases:
ProtocolPort for recording runtime and compile-time events.
Implement this protocol to plug in any metrics backend (Prometheus, StatsD, in-memory counters, etc.) without coupling the framework to a concrete provider.
Transport adapters may also implement this protocol to record HTTP or Kafka-level metrics alongside UseCase-level events.
Example:
class PrometheusAdapter: def on_event(self, event: RuntimeEvent) -> None: if event.kind == EventKind.EXEC_DONE: DURATION.labels(usecase=event.use_case_name).observe( (event.duration_ms or 0) / 1000 )
- on_event(event)[source]¶
Process a runtime or compile-time event.
- Parameters:
event (RuntimeEvent) – Immutable event carrying kind, use case name, optional step name, duration, status, and error.
- Return type:
None
- class loom.core.engine.ParamBinding(name, annotation)[source]¶
Bases:
objectA primitive parameter declared in
execute.Represents a positional/keyword argument that is provided by the caller at execution time (e.g.
user_id: int).
- class loom.core.engine.RuleStep(fn, accepts_context=False)[source]¶
Bases:
objectA rule validation step.
Holds a direct reference to the
RuleFnso the plan is self-contained and requires no back-reference to the UseCase class.
- class loom.core.engine.RuntimeEvent(kind, use_case_name, step_name=None, duration_ms=None, status=None, error=None, trace_id=None)[source]¶
Bases:
objectImmutable event emitted by the compiler and runtime executor.
Consumed by
MetricsAdapterand structured loggers. Carries no transport-specific data — adapters translate to their own formats.- Parameters:
kind (EventKind) – Event discriminator.
use_case_name (str) – Qualified name of the executing UseCase.
step_name (str | None) – Step label (e.g.
"Load User"), orNone.duration_ms (float | None) – Wall-clock duration in milliseconds, or
None.status (str | None) – Outcome label (e.g.
"success","failure"), orNone.error (BaseException | None) – Exception instance if the event represents a failure, or
None.trace_id (str | None)
Example:
event = RuntimeEvent( kind=EventKind.EXEC_DONE, use_case_name="UpdateUserUseCase", duration_ms=12.4, status="success", )
- class loom.core.engine.RuntimeExecutor(compiler, *, uow_factory=None, debug_execution=False, logger=None, metrics=None, repo_resolver=None)[source]¶
Bases:
objectExecutes UseCases from their compiled ExecutionPlan without reflection.
Receives a fully constructed UseCase instance and drives execution through the fixed pipeline: bind params → build command → load entities → apply computes → check rules → call execute.
Optionally manages a
UnitOfWorklifecycle around each execution when auow_factoryis provided. Nested calls detected via acontextvars.ContextVarshare the outer transaction and never open an additional UoW.No signature inspection occurs at runtime. All structural information comes from the cached ExecutionPlan produced by UseCaseCompiler.
Emits
RuntimeEventobjects to the optionalMetricsAdapterat key lifecycle points (start, done, error). Enriches log calls with structured fields (usecase,duration_ms,status) for structured log consumers.- Parameters:
compiler (UseCaseCompiler) – Compiler used to retrieve cached plans.
uow_factory (UnitOfWorkFactory | None) – Optional UoW factory. When provided, each top-level
execute()call is wrapped in a single atomic transaction (begin → commit on success, rollback on exception). Nested executions within the same async context reuse the outer UoW.debug_execution (bool) – When
True, emits[STEP]logs for every pipeline stage. Defaults toFalse(summary logs only).logger (LoggerPort | None) – Optional logger. Defaults to the framework logger.
metrics (MetricsAdapter | None) – Optional metrics adapter. When provided, receives
EXEC_START,EXEC_DONE, andEXEC_ERRORevents.repo_resolver (Callable[[type[Any]], Any] | None) – Optional callable that resolves a repository instance from an entity model type. Used by
Load/Existswhendependenciesoverride is not passed toexecute().
Example:
executor = RuntimeExecutor( compiler, uow_factory=SQLAlchemyUnitOfWorkFactory(session_manager), metrics=prometheus_adapter, ) result = await executor.execute( use_case, params={"user_id": 1}, payload={"email": "new@corp.com"}, )
- async execute(compilable: UseCase[Any, ResultT], *, params: dict[str, Any] | None = None, payload: dict[str, Any] | None = None, dependencies: dict[type[Any], Any] | None = None, load_overrides: dict[type[Any], Any] | None = None, read_only: bool = False) ResultT[source]¶
- async execute(compilable: Job[ResultT], *, params: dict[str, Any] | None = None, payload: dict[str, Any] | None = None, dependencies: dict[type[Any], Any] | None = None, load_overrides: dict[type[Any], Any] | None = None, read_only: bool = False) ResultT
- async execute(compilable: Compilable, *, params: dict[str, Any] | None = None, payload: dict[str, Any] | None = None, dependencies: dict[type[Any], Any] | None = None, load_overrides: dict[type[Any], Any] | None = None, read_only: bool = False) Any
Execute a compiled instance via its ExecutionPlan.
Accepts any object satisfying the
Compilableprotocol — bothUseCaseandJobinstances are valid inputs.When a
uow_factorywas provided at construction and no UoW is already active in the current async context, opens a fresh UoW (begin), runs the pipeline, and commits on success or rolls back on any exception. Nested calls reuse the existing UoW transparently.- Parameters:
compilable (Compilable) – Constructed instance to execute.
params (dict[str, Any] | None) – Primitive parameter values keyed by name.
payload (dict[str, Any] | None) – Raw dict for command construction via
Input().dependencies (dict[type[Any], Any] | None) – Mapping of entity type to repository, used for
LoadById()/Load()/Exists()steps.load_overrides (dict[type[Any], Any] | None) – Pre-loaded entities by type, bypassing repo calls. Used by test harnesses.
read_only (bool) – When
True, skips opening aUnitOfWorktransaction even if auow_factorywas provided. Automatically set toTrueby the HTTP layer for GET routes. Also honoured whenplan.read_onlyisTrue.
- Returns:
The result produced by
execute().- Raises:
loom.core.errors.RuleViolations – If one or more rule steps fail.
NotFound – If a Load step finds no entity in the repository.
- Return type:
- class loom.core.engine.UseCaseCompiler(logger=None, metrics=None)[source]¶
Bases:
objectCompiles UseCase subclasses into immutable ExecutionPlans at startup.
Inspects the
executesignature exactly once per class, validates structural constraints, and caches the resulting plan.No reflection occurs after compilation — the plan drives all runtime execution via
RuntimeExecutor.- Parameters:
logger (LoggerPort | None) – Optional logger. Defaults to the framework logger.
metrics (MetricsAdapter | None) – Optional metrics adapter. When provided, receives
COMPILE_STARTandCOMPILE_DONEevents.
Example:
compiler = UseCaseCompiler(metrics=my_adapter) plan = compiler.compile(UpdateUserUseCase)
- compile(use_case_type)[source]¶
Return the ExecutionPlan for
use_case_type, compiling if needed.Compilation is idempotent: calling this method multiple times with the same class returns the cached plan without re-inspection. Accepts any class satisfying the
Compilableprotocol — bothUseCaseandJobsubclasses are valid.- Parameters:
use_case_type (type[Compilable]) – Concrete compilable class to compile.
- Returns:
Immutable ExecutionPlan.
- Raises:
CompilationError – If the signature violates structural constraints.
- Return type:
- get_plan(use_case_type)[source]¶
Return the cached plan for
use_case_type, orNone.- Parameters:
use_case_type (type[Compilable]) – UseCase subclass to look up.
- Returns:
Cached ExecutionPlan if compiled, otherwise
None.- Return type:
ExecutionPlan | None