from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass, replace
from typing import Any, Protocol, Self, TypeVar
from loom.core.command.base import Command
from loom.core.errors.errors import RuleViolation, RuleViolations
from loom.core.use_case._predicates import (
is_present as _is_present,
)
from loom.core.use_case._predicates import (
predicate_is_present as _predicate_is_present,
)
from loom.core.use_case._predicates import (
resolve_from_command as _resolve_from_command,
)
from loom.core.use_case.field_ref import FieldExpr, FieldRef
# Re-exported for backward compatibility — canonical home is loom.core.errors.
__all__ = [
"Rule",
"RuleFn",
"RuleViolation",
"RuleViolations",
]
CommandT = TypeVar("CommandT", bound=Command)
[docs]
class RuleFn(Protocol):
def __call__(
self,
command: Command,
fields_set: frozenset[str],
context: dict[str, object] | None = None,
) -> None: ...
class RuleBuilderFn(RuleFn, Protocol):
def from_command(self, *sources: FieldRef) -> Self: ...
def from_params(self, *names: str) -> Self: ...
def when_present(self, predicate: FieldRef | FieldExpr) -> Self: ...
@dataclass(frozen=True, slots=True)
class _RuleSpec:
evaluator: Callable[..., Any]
field: str | None = None
message: str | None = None
command_sources: tuple[FieldRef, ...] = ()
param_names: tuple[str, ...] = ()
include_command: bool = False
include_fields_set: bool = False
predicate: FieldRef | FieldExpr | None = None
def from_command(self, *sources: FieldRef) -> _RuleSpec:
if not sources:
# Full command mode: provide both command and fields_set.
return replace( # NOSONAR
self, command_sources=(), include_command=True, include_fields_set=True
)
return replace( # NOSONAR
self,
command_sources=tuple(sources),
include_command=False,
include_fields_set=False,
)
def from_params(self, *names: str) -> _RuleSpec:
if not names:
raise ValueError("Rule.from_params(...) requires at least one parameter name.")
return replace(self, param_names=(*self.param_names, *names)) # NOSONAR
def when_present(self, predicate: FieldRef | FieldExpr) -> _RuleSpec:
return replace(self, predicate=predicate) # NOSONAR
def __call__(
self,
command: Command,
fields_set: frozenset[str],
context: dict[str, object] | None = None,
) -> None:
if self.predicate is not None and not _predicate_is_present(
command, fields_set, self.predicate
):
return
args: list[Any] = []
if self.include_command:
args.append(command)
if self.include_fields_set:
args.append(fields_set)
for source in self.command_sources:
args.append(_resolve_from_command(command, source))
if self.param_names:
if context is None:
raise ValueError("Rule.from_params(...) requires runtime context.")
if missing := [n for n in self.param_names if n not in context]:
raise ValueError(f"Missing runtime context keys for rule: {', '.join(missing)}")
args.extend(context[n] for n in self.param_names)
result = self.evaluator(*args)
if isinstance(result, RuleViolation):
raise result
if isinstance(result, str):
raise RuleViolation(self._resolved_field(command), result)
if result is True: # intentional: only boolean True signals failure, not any truthy value
raise RuleViolation(self._resolved_field(command), self._resolved_message())
def _resolved_field(self, command: Command) -> str:
candidates = (
self.field,
self.command_sources[0].leaf if self.command_sources else None,
self.param_names[0] if self.param_names else None,
self.predicate.leaf if isinstance(self.predicate, FieldRef) else None,
)
return next((c for c in candidates if c is not None), type(command).__name__)
def _resolved_message(self) -> str:
return self.message or "Rule check failed"
@dataclass(frozen=True, slots=True)
class _RequirePresentSpec:
target: FieldRef
field: str
message: str
predicate: FieldRef | FieldExpr | None = None
def when_present(self, predicate: FieldRef | FieldExpr) -> _RequirePresentSpec:
return replace(self, predicate=predicate) # NOSONAR
def __call__(
self,
command: Command,
fields_set: frozenset[str],
context: dict[str, object] | None = None,
) -> None:
del context
if self.predicate is not None and not _predicate_is_present(
command, fields_set, self.predicate
):
return
if not _is_present(command, fields_set, self.target):
raise RuleViolation(self.field, self.message)
[docs]
class Rule:
"""Rule DSL namespace."""
@staticmethod
def check(
*targets: FieldRef,
via: Callable[..., Any],
message: str | None = None,
) -> RuleBuilderFn:
if not targets:
raise ValueError("Rule.check(...) requires at least one target.")
return _RuleSpec(
evaluator=via,
field=targets[0].leaf,
message=message,
command_sources=tuple(targets),
)
@staticmethod
def forbid(
condition: Callable[..., bool],
*,
message: str,
) -> RuleBuilderFn:
return _RuleSpec(
evaluator=condition,
message=message,
include_command=True,
include_fields_set=True,
)
@staticmethod
def require_present(
target: FieldRef,
*,
field: str | None = None,
message: str | None = None,
) -> RuleFn:
resolved_field = field or target.leaf
resolved_message = message or f"{target.leaf} is required"
return _RequirePresentSpec(
target=target,
field=resolved_field,
message=resolved_message,
)