Source code for loom.core.engine.compiler

from __future__ import annotations

import inspect
import typing
from typing import Any

from loom.core.engine.compilable import Compilable
from loom.core.engine.events import EventKind, RuntimeEvent
from loom.core.engine.metrics import MetricsAdapter
from loom.core.engine.plan import (
    ComputeStep,
    ExecutionPlan,
    ExistsStep,
    InputBinding,
    LoadStep,
    ParamBinding,
    RuleStep,
)
from loom.core.logger import LoggerPort, get_logger
from loom.core.use_case.markers import (
    LookupKind,
    SourceKind,
    _ExistsMarker,
    _InputMarker,
    _LoadByIdMarker,
    _LoadMarker,
)


[docs] class CompilationError(Exception): """Raised when a UseCase fails structural validation at startup. Args: message: Human-readable description of the compilation failure. """
[docs] class UseCaseCompiler: """Compiles UseCase subclasses into immutable ExecutionPlans at startup. Inspects the ``execute`` signature exactly once per class, validates structural constraints, and caches the resulting plan. No reflection occurs after compilation — the plan drives all runtime execution via ``RuntimeExecutor``. Args: logger: Optional logger. Defaults to the framework logger. metrics: Optional metrics adapter. When provided, receives ``COMPILE_START`` and ``COMPILE_DONE`` events. Example:: compiler = UseCaseCompiler(metrics=my_adapter) plan = compiler.compile(UpdateUserUseCase) """ def __init__( self, logger: LoggerPort | None = None, metrics: MetricsAdapter | None = None, ) -> None: self._cache: dict[type[Any], ExecutionPlan] = {} self._logger = logger or get_logger(__name__) self._metrics = metrics
[docs] def compile(self, use_case_type: type[Compilable]) -> ExecutionPlan: """Return the ExecutionPlan for ``use_case_type``, compiling if needed. Compilation is idempotent: calling this method multiple times with the same class returns the cached plan without re-inspection. Accepts any class satisfying the :class:`~loom.core.engine.compilable.Compilable` protocol — both ``UseCase`` and ``Job`` subclasses are valid. Args: use_case_type: Concrete compilable class to compile. Returns: Immutable ExecutionPlan. Raises: CompilationError: If the signature violates structural constraints. """ if use_case_type in self._cache: return self._cache[use_case_type] plan = self._build_plan(use_case_type) self._cache[use_case_type] = plan return plan
[docs] def get_plan(self, use_case_type: type[Compilable]) -> ExecutionPlan | None: """Return the cached plan for ``use_case_type``, or ``None``. Args: use_case_type: UseCase subclass to look up. Returns: Cached ExecutionPlan if compiled, otherwise ``None``. """ return self._cache.get(use_case_type)
def _emit(self, event: RuntimeEvent) -> None: if self._metrics is not None: self._metrics.on_event(event) def _build_plan(self, use_case_type: type[Compilable]) -> ExecutionPlan: uc_name = use_case_type.__qualname__ self._logger.info(f"[BOOT] Compiling UseCase: {uc_name}", usecase=uc_name) self._emit(RuntimeEvent(kind=EventKind.COMPILE_START, use_case_name=uc_name)) param_bindings, input_binding, load_steps, exists_steps = self._inspect_execute( use_case_type ) compute_steps = tuple( ComputeStep(fn=fn, accepts_context=_fn_accepts_context(fn)) for fn in use_case_type.computes ) rule_steps = tuple( RuleStep(fn=fn, accepts_context=_fn_accepts_context(fn)) for fn in use_case_type.rules ) self._logger.info( f"[BOOT] - Validated {len(compute_steps)} compute steps", usecase=uc_name, ) self._logger.info( f"[BOOT] - Validated {len(rule_steps)} rules", usecase=uc_name, ) total = len(load_steps) + len(exists_steps) + len(compute_steps) + len(rule_steps) self._logger.info( f"[BOOT] - ExecutionPlan built ({total} steps)", usecase=uc_name, ) self._emit(RuntimeEvent(kind=EventKind.COMPILE_DONE, use_case_name=uc_name)) plan = ExecutionPlan( use_case_type=use_case_type, param_bindings=tuple(param_bindings), input_binding=input_binding, load_steps=tuple(load_steps), exists_steps=tuple(exists_steps), compute_steps=compute_steps, rule_steps=rule_steps, read_only=bool(getattr(use_case_type, "read_only", False)), ) use_case_type.__execution_plan__ = plan return plan def _inspect_execute( self, use_case_type: type[Compilable], ) -> tuple[list[ParamBinding], InputBinding | None, list[LoadStep], list[ExistsStep]]: execute_fn = use_case_type.execute if getattr(execute_fn, "__isabstractmethod__", False): raise CompilationError( f"{use_case_type.__qualname__} must override execute() before it can be compiled" ) try: hints = typing.get_type_hints(execute_fn) except Exception: hints = {} sig = inspect.signature(execute_fn) param_bindings: list[ParamBinding] = [] input_binding: InputBinding | None = None load_steps: list[LoadStep] = [] exists_steps: list[ExistsStep] = [] input_count = 0 _variadic = (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD) for name, param in sig.parameters.items(): if self._should_skip_parameter(name, param): continue input_binding, input_count = self._collect_binding_from_parameter( use_case_type=use_case_type, name=name, param=param, annotation=hints.get(name, Any), input_binding=input_binding, input_count=input_count, param_bindings=param_bindings, load_steps=load_steps, exists_steps=exists_steps, ) self._validate_marker_refs( use_case_type, param_bindings, input_binding, load_steps, exists_steps, ) return param_bindings, input_binding, load_steps, exists_steps def _collect_binding_from_parameter( self, *, use_case_type: type[Compilable], name: str, param: inspect.Parameter, annotation: Any, input_binding: InputBinding | None, input_count: int, param_bindings: list[ParamBinding], load_steps: list[LoadStep], exists_steps: list[ExistsStep], ) -> tuple[InputBinding | None, int]: marker_default = param.default if isinstance(marker_default, _InputMarker): return self._handle_input_marker( use_case_type=use_case_type, name=name, annotation=annotation, input_binding=input_binding, input_count=input_count, ) if isinstance(marker_default, _LoadByIdMarker): self._handle_load_by_id_marker( name=name, marker=marker_default, load_steps=load_steps, exists_steps=exists_steps, ) return input_binding, input_count if isinstance(marker_default, _LoadMarker): self._handle_load_marker( name=name, marker=marker_default, load_steps=load_steps, exists_steps=exists_steps, ) return input_binding, input_count if isinstance(marker_default, _ExistsMarker): self._handle_exists_marker( name=name, marker=marker_default, load_steps=load_steps, exists_steps=exists_steps, ) return input_binding, input_count param_bindings.append(ParamBinding(name=name, annotation=annotation)) return input_binding, input_count def _handle_load_by_id_marker( self, *, name: str, marker: _LoadByIdMarker[Any], load_steps: list[LoadStep], exists_steps: list[ExistsStep], ) -> None: del exists_steps load_steps.append(self._build_load_by_id_step(name, marker)) self._log_load_by_id(marker) def _handle_load_marker( self, *, name: str, marker: _LoadMarker[Any], load_steps: list[LoadStep], exists_steps: list[ExistsStep], ) -> None: del exists_steps load_steps.append(self._build_load_step(name, marker)) self._log_load(marker) def _handle_exists_marker( self, *, name: str, marker: _ExistsMarker[Any], load_steps: list[LoadStep], exists_steps: list[ExistsStep], ) -> None: del load_steps exists_steps.append(self._build_exists_step(name, marker)) self._log_exists(marker) @staticmethod def _should_skip_parameter(name: str, param: inspect.Parameter) -> bool: variadic = (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD) return name == "self" or param.kind in variadic def _handle_input_marker( self, *, use_case_type: type[Compilable], name: str, annotation: Any, input_binding: InputBinding | None, input_count: int, ) -> tuple[InputBinding, int]: next_count = input_count + 1 if next_count > 1: raise CompilationError( f"{use_case_type.__qualname__}.execute: only one Input() parameter is allowed" ) from_payload = getattr(annotation, "from_payload", None) if not callable(from_payload): raise CompilationError( f"{use_case_type.__qualname__}.execute: " f"Input() parameter '{name}' type must implement from_payload(payload)" ) next_binding = InputBinding(name=name, command_type=annotation) cmd_name = getattr(annotation, "__name__", repr(annotation)) self._logger.info(f"[BOOT] - Detected Input: {cmd_name}") return next_binding, next_count @staticmethod def _build_load_by_id_step(name: str, marker: _LoadByIdMarker[Any]) -> LoadStep: return LoadStep( name=name, entity_type=marker.entity_type, source_kind=SourceKind.PARAM, source_name=marker.by, lookup_kind=LookupKind.BY_ID, against="id", profile=marker.profile, on_missing=marker.on_missing, ) @staticmethod def _build_load_step(name: str, marker: _LoadMarker[Any]) -> LoadStep: return LoadStep( name=name, entity_type=marker.entity_type, source_kind=marker.from_kind, source_name=marker.from_name, lookup_kind=LookupKind.BY_FIELD, against=marker.against, profile=marker.profile, on_missing=marker.on_missing, ) @staticmethod def _build_exists_step(name: str, marker: _ExistsMarker[Any]) -> ExistsStep: return ExistsStep( name=name, entity_type=marker.entity_type, source_kind=marker.from_kind, source_name=marker.from_name, against=marker.against, on_missing=marker.on_missing, ) def _log_load_by_id(self, marker: _LoadByIdMarker[Any]) -> None: self._logger.info( f"[BOOT] - Detected LoadById: {marker.entity_type.__name__} by {marker.by}" ) def _log_load(self, marker: _LoadMarker[Any]) -> None: src = f"{marker.from_kind.value}:{marker.from_name}" self._logger.info( f"[BOOT] - Detected Load: {marker.entity_type.__name__} {marker.against} <- {src}" ) def _log_exists(self, marker: _ExistsMarker[Any]) -> None: src = f"{marker.from_kind.value}:{marker.from_name}" self._logger.info( f"[BOOT] - Detected Exists: {marker.entity_type.__name__} {marker.against} <- {src}" ) def _validate_marker_refs( self, use_case_type: type[Compilable], param_bindings: list[ParamBinding], input_binding: InputBinding | None, load_steps: list[LoadStep], exists_steps: list[ExistsStep], ) -> None: param_names = {pb.name for pb in param_bindings} for ls in load_steps: if ls.source_kind is SourceKind.PARAM and ls.source_name not in param_names: raise CompilationError( f"{use_case_type.__qualname__}.execute: " f"LoadById({ls.entity_type.__name__}): " f"parameter '{ls.source_name}' not found in execute signature" ) if ls.source_kind is SourceKind.COMMAND and input_binding is None: raise CompilationError( f"{use_case_type.__qualname__}.execute: " f"LoadById({ls.entity_type.__name__}): from_command requires Input() parameter" ) for es in exists_steps: if es.source_kind is SourceKind.PARAM and es.source_name not in param_names: raise CompilationError( f"{use_case_type.__qualname__}.execute: " f"Exists({es.entity_type.__name__}): " f"parameter '{es.source_name}' not found in execute signature" ) if es.source_kind is SourceKind.COMMAND and input_binding is None: raise CompilationError( f"{use_case_type.__qualname__}.execute: " f"Exists({es.entity_type.__name__}): from_command requires Input() parameter" )
def _fn_accepts_context(fn: Any) -> bool: """Return ``True`` when *fn* accepts a third positional context argument. Called once per compute/rule function at compile time so the executor never needs to call ``inspect.signature`` on the hot request path. Args: fn: Callable to introspect. Returns: ``True`` if *fn* has three or more positional parameters, or uses ``*args`` (``VAR_POSITIONAL``). """ try: params = tuple(inspect.signature(fn).parameters.values()) except (TypeError, ValueError): return False positional_kinds = ( inspect.Parameter.POSITIONAL_ONLY, inspect.Parameter.POSITIONAL_OR_KEYWORD, ) positional = [p for p in params if p.kind in positional_kinds] if len(positional) >= 3: return True return any(p.kind is inspect.Parameter.VAR_POSITIONAL for p in params)