Source code for cache.async_lru

from .async_cache import AsyncCache
from .key import make_key


[docs] class AsyncLRUInvalidator: """Decorator that invalidates a specific AsyncLRU-cached function when the decorated mutation/write function is called. Usage:: @AsyncLRU(maxsize=128) async def get_user(user_id): return await db.get_user(user_id) @AsyncLRUInvalidator(get_user) async def update_user(user_id, data): await db.update_user(user_id, data) await get_user(1) # miss -> loads await get_user(1) # hit await update_user(1, {}) # invalidates get_user(1), then runs mutation; # cache key is evicted before the mutation so # no reader ever sees stale data, even if the # mutation raises await get_user(1) # miss -> reloads Invalidation happens **before** the mutation executes so that no reader ever sees stale data, even if the mutation raises. :param cached_fn: The AsyncLRU-wrapped function to invalidate. :param skip_args: **Removed.** Passing a non-zero value raises ``TypeError``. Use ``key_fn`` instead to map mutation args to cache-key args. :param clear_all: If True, clear the entire cache instead of a specific key. :param key_fn: Optional callable ``(mutation_args, mutation_kwargs) -> (args, kwargs)`` that extracts the cached function's key args from the mutation function's args. If None (default), the mutation function's full positional args and kwargs are passed to the cached function's ``invalidate_cache()``, which applies its own ``skip_args`` to derive the correct key. """ def __init__(self, cached_fn, skip_args=0, clear_all=False, key_fn=None): if skip_args != 0: raise TypeError( "skip_args is no longer supported on invalidators because it " "can silently produce the wrong cache key. Use key_fn to " "map the mutation function's arguments to the cached " "function's arguments instead. Example:\n" " @AsyncLRUInvalidator(cached_fn, " "key_fn=lambda args, kw: (args[1:], {}))" ) self.cached_fn = cached_fn self.clear_all = clear_all self.key_fn = key_fn def __call__(self, func): cached_fn = self.cached_fn clear_all = self.clear_all key_fn = self.key_fn async def wrapper(*args, **kwargs): # Invalidate BEFORE the mutation so that no reader sees stale # data after the mutation starts, even if the mutation raises. if clear_all: cached_fn.clear_cache() elif key_fn is not None: inv_args, inv_kwargs = key_fn(args, kwargs) cached_fn._invalidate_raw(*inv_args, **inv_kwargs) else: cached_fn.invalidate_cache(*args, **kwargs) result = await func(*args, **kwargs) return result wrapper.__name__ = func.__name__ wrapper.__wrapped__ = func return wrapper
[docs] class AsyncLRU: def __init__(self, maxsize=128, skip_args: int = 0, backup=None, remote_cache=None): """ :param maxsize: Use maxsize as None for unlimited size cache :param skip_args: Use `1` to skip first arg of func in determining cache key (e.g., skip 'self' in methods) :param backup: Optional CacheBackend for disk persistence (e.g., DiskBackend('/tmp/lru.pkl')) :param remote_cache: Optional RemoteCacheBackend for distributed L2 caching (e.g., RedisBackend()) """ self.cache = AsyncCache(maxsize=maxsize, default_ttl=None, backup=backup, remote_cache=remote_cache) self.skip_args = skip_args
[docs] def clear_cache(self): """ Clears the LRU cache. This method empties the cache, removing all stored entries and effectively resetting the cache. :return: None """ self.cache.clear()
def __call__(self, func): # thin wrapper using core AsyncCache; key via make_key (respects skip_args for parity with AsyncTTL) # !use_cache (force-miss/refresh): delete key then get (ensures miss count in metrics + reload) async def wrapper(*args, use_cache=True, **kwargs): key = make_key(func, args, kwargs, self.skip_args) if not use_cache: # force refresh: invalidate to trigger miss, then get+load (for correct metrics) self.cache.delete(key) # fallthrough to get below # get (with loader on miss; now covers force-miss case too) async def loader(): return await func(*args, **kwargs) return await self.cache.get(key, loader=loader) wrapper.__name__ = func.__name__ wrapper.__dict__['clear_cache'] = self.clear_cache wrapper.__dict__['get_metrics'] = self.cache.get_metrics wrapper.__dict__['save_to_backup'] = self.cache.save_to_backup wrapper.__dict__['load_from_backup'] = self.cache.load_from_backup # invalidate as closure (needs func + skip_args for key) def invalidate_cache(*args, **kwargs): key = make_key(func, args, kwargs, self.skip_args) self.cache.delete(key) wrapper.__dict__['invalidate_cache'] = invalidate_cache # Raw invalidate (no skip_args) for use by AsyncLRUInvalidator def _invalidate_raw(*args, **kwargs): key = make_key(func, args, kwargs, 0) self.cache.delete(key) wrapper.__dict__['_invalidate_raw'] = _invalidate_raw return wrapper