from __future__ import annotations
import importlib
from collections.abc import Awaitable, Callable, Mapping
from typing import TYPE_CHECKING, Any, TypeVar, cast, overload
import msgspec
from loom.core.cache.serializer import MsgspecSerializer
if TYPE_CHECKING:
from loom.core.cache.abc.config import CacheConfig
T = TypeVar("T")
_SIMPLE_MEMORY_CACHE = "SimpleMemoryCache"
def _is_raw_backend(cache: Any) -> bool:
"""Return ``True`` when the aiocache backend stores values without serialization.
A backend is considered "raw" when its configured serializer is NOT
:class:`~loom.core.cache.serializer.MsgspecSerializer`. Raw backends
support native atomic increment operations (Redis ``INCR``,
``SimpleMemoryCache.increment``) because there are no encoded bytes to
corrupt.
"""
return not isinstance(getattr(cache, "serializer", None), MsgspecSerializer)
[docs]
class CacheGateway:
"""Facade over aiocache with msgpack serialization for entity data.
Two distinct usage modes:
**Data gateway** — configured with
:class:`~loom.core.cache.serializer.MsgspecSerializer`. Used by
:class:`~loom.core.cache.repository.CachedRepository` for entity and list
storage. All values are msgpack-encoded on write and decoded on read.
**Counter gateway** — configured *without* a serializer (raw backend).
Used by :class:`~loom.core.cache.dependency.GenerationalDependencyResolver`
for generation counter storage. Values are stored as plain Python integers,
enabling atomic native increment on both Redis and ``SimpleMemoryCache``.
The gateway auto-detects which mode it is in at construction time and routes
:meth:`incr` accordingly — no manual configuration needed beyond choosing the
right aiocache alias.
Args:
alias: Registered aiocache alias to retrieve the backend from.
Example::
data_gateway = CacheGateway(alias=config.aiocache_alias)
counter_gateway = CacheGateway(alias=config.effective_counter_alias)
resolver = GenerationalDependencyResolver(counter_gateway)
"""
def __init__(self, *, alias: str = "default") -> None:
"""Initialise the gateway using the named aiocache alias.
Args:
alias: Registered aiocache alias to retrieve the backend from.
"""
caches = importlib.import_module("aiocache").caches
self._cache = caches.get(alias)
# Raw backends (no MsgspecSerializer) support native atomic increment.
self._native_counters: bool = _is_raw_backend(self._cache)
# ------------------------------------------------------------------
# Configuration helpers
# ------------------------------------------------------------------
[docs]
@classmethod
def apply_config(cls, config: CacheConfig) -> None:
"""Configure aiocache from a :class:`~loom.core.cache.abc.config.CacheConfig`.
Equivalent to :meth:`configure` but also injects ``config.max_size``
into every ``aiocache.SimpleMemoryCache`` backend entry that does not
already declare its own ``max_size``. Entries for other backend types
(Redis, Memcached, …) are forwarded unchanged.
Args:
config: Resolved cache configuration.
Example::
cache_cfg = CacheConfig(
aiocache_alias="cache",
counter_alias="counters",
max_size=1000,
aiocache_config={
"cache": {"cache": "aiocache.SimpleMemoryCache", ...},
"counters": {"cache": "aiocache.SimpleMemoryCache"},
},
)
CacheGateway.apply_config(cache_cfg)
"""
raw: dict[str, Any] = {}
for alias, backend_cfg in config.aiocache_config.items():
if not isinstance(backend_cfg, dict):
raw[alias] = backend_cfg
continue
entry = dict(backend_cfg)
if config.max_size is not None and _SIMPLE_MEMORY_CACHE in str(entry.get("cache", "")):
entry.setdefault("max_size", config.max_size)
raw[alias] = entry
cls.configure(raw)
# ------------------------------------------------------------------
# Read
# ------------------------------------------------------------------
@overload
async def get_value(self, key: str, *, type: type[T]) -> T | None: ...
@overload
async def get_value(self, key: str, *, type: None = ...) -> Any: ...
[docs]
async def get_value(self, key: str, *, type: type[T] | None = None) -> T | Any | None:
"""Retrieve a cached value, optionally converting it to the given type.
Args:
key: Cache key.
type: Optional target type for ``msgspec.convert``.
Returns:
The cached value (converted to ``type`` if given) or ``None`` on miss.
"""
value = await self._cache.get(key)
if value is None:
return None
if type is None:
return value
return msgspec.convert(value, type=type)
[docs]
async def multi_get_values(
self,
keys: list[str],
*,
type: type[T] | None = None,
) -> list[T | Any | None]:
"""Retrieve multiple values in a single round-trip.
Args:
keys: Cache keys to look up.
type: Optional target type for ``msgspec.convert`` on each value.
Returns:
Values in the same order as ``keys``, with ``None`` for misses.
"""
values = await self._cache.multi_get(keys)
if type is not None:
return [
msgspec.convert(value, type=type) if value is not None else None for value in values
]
return cast(list[T | Any | None], values)
[docs]
async def exists(self, key: str) -> bool:
"""Check whether a key exists in the cache.
Args:
key: Cache key to check.
Returns:
``True`` if the key is present.
"""
return bool(await self._cache.exists(key))
# ------------------------------------------------------------------
# Write
# ------------------------------------------------------------------
[docs]
async def set_value(self, key: str, value: Any, ttl: int | None = None) -> None:
"""Store a value under the given key.
Args:
key: Cache key.
value: Value to store.
ttl: Time-to-live in seconds. ``None`` means no expiration.
"""
await self._cache.set(key, value, ttl=ttl)
[docs]
async def multi_set_values(
self,
pairs: list[tuple[str, Any]],
ttl: int | None = None,
) -> None:
"""Store multiple key-value pairs in a single round-trip.
Args:
pairs: List of ``(key, value)`` tuples.
ttl: Time-to-live in seconds applied to all entries.
"""
await self._cache.multi_set(pairs, ttl=ttl)
[docs]
async def incr(self, key: str, delta: int = 1) -> int:
"""Increment a numeric value at the given key.
Routes to the optimal strategy based on the backend configuration
detected at construction time:
- **Raw backend** (no serializer): delegates to the backend's native
``increment`` method when available. On Redis this is an atomic
``INCR`` / ``INCRBY`` command; on ``SimpleMemoryCache`` it is
asyncio-safe. Falls back to GET+SET for backends that do not expose
``increment``.
- **Serialized backend** (``MsgspecSerializer``): uses a non-atomic
GET+SET. Correct for single-process deployments; the asyncio event
loop does not preempt between the two awaits within a single
coroutine execution.
Args:
key: Cache key holding an integer value.
delta: Amount to increment by. Defaults to ``1``.
Returns:
The new value after incrementing.
"""
if self._native_counters:
return await self._native_incr(key, delta)
return await self._serialized_incr(key, delta)
async def _native_incr(self, key: str, delta: int) -> int:
raw = getattr(self._cache, "increment", None)
if callable(raw):
fn = cast(Callable[[str, int], Awaitable[Any]], raw)
return int(await fn(key, delta))
return await self._serialized_incr(key, delta)
async def _serialized_incr(self, key: str, delta: int) -> int:
current = await self.get_value(key, type=int)
next_value = (0 if current is None else int(current)) + delta
await self.set_value(key, next_value)
return next_value
# ------------------------------------------------------------------
# Delete / maintenance
# ------------------------------------------------------------------
[docs]
async def delete(self, key: str) -> int:
"""Delete a single key from the cache.
Args:
key: Cache key to remove.
Returns:
Number of keys actually deleted (0 or 1).
"""
return int(await self._cache.delete(key))
[docs]
async def delete_many(self, keys: list[str]) -> int:
"""Delete multiple keys from the cache.
Args:
keys: Cache keys to remove.
Returns:
Number of keys actually deleted.
"""
return int(await self._cache.multi_delete(keys))
[docs]
async def clear(self) -> None:
"""Remove all entries from the cache backend."""
await self._cache.clear()
[docs]
async def close(self) -> None:
"""Release the underlying cache connection resources."""
await self._cache.close()