from .async_cache import AsyncCache
from .key import make_key
[docs]
class AsyncLRUInvalidator:
"""Decorator that invalidates a specific AsyncLRU-cached function when
the decorated mutation/write function is called.
Usage::
@AsyncLRU(maxsize=128)
async def get_user(user_id):
return await db.get_user(user_id)
@AsyncLRUInvalidator(get_user)
async def update_user(user_id, data):
await db.update_user(user_id, data)
await get_user(1) # miss -> loads
await get_user(1) # hit
await update_user(1, {}) # invalidates get_user(1), then runs mutation;
# cache key is evicted before the mutation so
# no reader ever sees stale data, even if the
# mutation raises
await get_user(1) # miss -> reloads
Invalidation happens **before** the mutation executes so that no reader
ever sees stale data, even if the mutation raises.
:param cached_fn: The AsyncLRU-wrapped function to invalidate.
:param skip_args: **Removed.** Passing a non-zero value raises ``TypeError``.
Use ``key_fn`` instead to map mutation args to cache-key args.
:param clear_all: If True, clear the entire cache instead of a specific key.
:param key_fn: Optional callable ``(mutation_args, mutation_kwargs) -> (args, kwargs)``
that extracts the cached function's key args from the mutation function's args.
If None (default), the mutation function's full positional args and kwargs
are passed to the cached function's ``invalidate_cache()``, which applies
its own ``skip_args`` to derive the correct key.
"""
def __init__(self, cached_fn, skip_args=0, clear_all=False, key_fn=None):
if skip_args != 0:
raise TypeError(
"skip_args is no longer supported on invalidators because it "
"can silently produce the wrong cache key. Use key_fn to "
"map the mutation function's arguments to the cached "
"function's arguments instead. Example:\n"
" @AsyncLRUInvalidator(cached_fn, "
"key_fn=lambda args, kw: (args[1:], {}))"
)
self.cached_fn = cached_fn
self.clear_all = clear_all
self.key_fn = key_fn
def __call__(self, func):
cached_fn = self.cached_fn
clear_all = self.clear_all
key_fn = self.key_fn
async def wrapper(*args, **kwargs):
# Invalidate BEFORE the mutation so that no reader sees stale
# data after the mutation starts, even if the mutation raises.
if clear_all:
cached_fn.clear_cache()
elif key_fn is not None:
inv_args, inv_kwargs = key_fn(args, kwargs)
cached_fn._invalidate_raw(*inv_args, **inv_kwargs)
else:
cached_fn.invalidate_cache(*args, **kwargs)
result = await func(*args, **kwargs)
return result
wrapper.__name__ = func.__name__
wrapper.__wrapped__ = func
return wrapper
[docs]
class AsyncLRU:
def __init__(self, maxsize=128, skip_args: int = 0, backup=None, remote_cache=None):
"""
:param maxsize: Use maxsize as None for unlimited size cache
:param skip_args: Use `1` to skip first arg of func in determining cache key (e.g., skip 'self' in methods)
:param backup: Optional CacheBackend for disk persistence (e.g., DiskBackend('/tmp/lru.pkl'))
:param remote_cache: Optional RemoteCacheBackend for distributed L2 caching (e.g., RedisBackend())
"""
self.cache = AsyncCache(maxsize=maxsize, default_ttl=None, backup=backup, remote_cache=remote_cache)
self.skip_args = skip_args
[docs]
def clear_cache(self):
"""
Clears the LRU cache.
This method empties the cache, removing all stored
entries and effectively resetting the cache.
:return: None
"""
self.cache.clear()
def __call__(self, func):
# thin wrapper using core AsyncCache; key via make_key (respects skip_args for parity with AsyncTTL)
# !use_cache (force-miss/refresh): delete key then get (ensures miss count in metrics + reload)
async def wrapper(*args, use_cache=True, **kwargs):
key = make_key(func, args, kwargs, self.skip_args)
if not use_cache:
# force refresh: invalidate to trigger miss, then get+load (for correct metrics)
self.cache.delete(key)
# fallthrough to get below
# get (with loader on miss; now covers force-miss case too)
async def loader():
return await func(*args, **kwargs)
return await self.cache.get(key, loader=loader)
wrapper.__name__ = func.__name__
wrapper.__dict__['clear_cache'] = self.clear_cache
wrapper.__dict__['get_metrics'] = self.cache.get_metrics
wrapper.__dict__['save_to_backup'] = self.cache.save_to_backup
wrapper.__dict__['load_from_backup'] = self.cache.load_from_backup
# invalidate as closure (needs func + skip_args for key)
def invalidate_cache(*args, **kwargs):
key = make_key(func, args, kwargs, self.skip_args)
self.cache.delete(key)
wrapper.__dict__['invalidate_cache'] = invalidate_cache
# Raw invalidate (no skip_args) for use by AsyncLRUInvalidator
def _invalidate_raw(*args, **kwargs):
key = make_key(func, args, kwargs, 0)
self.cache.delete(key)
wrapper.__dict__['_invalidate_raw'] = _invalidate_raw
return wrapper