from .async_cache import AsyncCache
from .key import make_key
[docs]
class AsyncTTLInvalidator:
"""Decorator that invalidates a specific AsyncTTL-cached function when
the decorated mutation/write function is called.
Usage::
@AsyncTTL(time_to_live=60)
async def get_session(sid):
return await db.get_session(sid)
@AsyncTTLInvalidator(get_session)
async def end_session(sid):
await db.delete_session(sid)
await get_session("s1") # miss -> loads
await get_session("s1") # hit
await end_session("s1") # invalidates get_session("s1"), then runs
# mutation; cache key is evicted before the
# mutation so no reader ever sees stale data,
# even if the mutation raises
await get_session("s1") # miss -> reloads
Invalidation happens **before** the mutation executes so that no reader
ever sees stale data, even if the mutation raises.
:param cached_fn: The AsyncTTL-wrapped function to invalidate.
:param skip_args: **Removed.** Passing a non-zero value raises ``TypeError``.
Use ``key_fn`` instead to map mutation args to cache-key args.
:param clear_all: If True, clear the entire cache instead of a specific key.
:param key_fn: Optional callable ``(mutation_args, mutation_kwargs) -> (args, kwargs)``
that extracts the cached function's key args from the mutation function's args.
If None (default), the mutation function's full positional args and kwargs
are passed to the cached function's ``invalidate_cache()``, which applies
its own ``skip_args`` to derive the correct key.
"""
def __init__(self, cached_fn, skip_args=0, clear_all=False, key_fn=None):
if skip_args != 0:
raise TypeError(
"skip_args is no longer supported on invalidators because it "
"can silently produce the wrong cache key. Use key_fn to "
"map the mutation function's arguments to the cached "
"function's arguments instead. Example:\n"
" @AsyncTTLInvalidator(cached_fn, "
"key_fn=lambda args, kw: (args[1:], {}))"
)
self.cached_fn = cached_fn
self.clear_all = clear_all
self.key_fn = key_fn
def __call__(self, func):
cached_fn = self.cached_fn
clear_all = self.clear_all
key_fn = self.key_fn
async def wrapper(*args, **kwargs):
# Invalidate BEFORE the mutation so that no reader sees stale
# data after the mutation starts, even if the mutation raises.
if clear_all:
cached_fn.clear_cache()
elif key_fn is not None:
inv_args, inv_kwargs = key_fn(args, kwargs)
cached_fn._invalidate_raw(*inv_args, **inv_kwargs)
else:
cached_fn.invalidate_cache(*args, **kwargs)
result = await func(*args, **kwargs)
return result
wrapper.__name__ = func.__name__
wrapper.__wrapped__ = func
return wrapper
[docs]
class AsyncTTL:
def __init__(self, time_to_live=60, maxsize=1024, skip_args: int = 0, backup=None, remote_cache=None):
"""
:param time_to_live: Use time_to_live as None for non expiring cache
:param maxsize: Use maxsize as None for unlimited size cache
:param skip_args: Use `1` to skip first arg of func in determining cache key
:param backup: Optional CacheBackend for disk persistence (e.g., DiskBackend('/tmp/ttl.pkl'))
:param remote_cache: Optional RemoteCacheBackend for distributed L2 caching (e.g., RedisBackend())
"""
self.cache = AsyncCache(maxsize=maxsize, default_ttl=time_to_live, backup=backup, remote_cache=remote_cache)
self.skip_args = skip_args
[docs]
def clear_cache(self):
"""
Clears the TTL cache.
This method empties the cache, removing all stored
entries and effectively resetting the cache.
:return: None
"""
self.cache.clear()
def __call__(self, func):
# thin wrapper over AsyncCache; key via make_key (with skip)
# !use_cache (force-miss/refresh): delete key then get (ensures miss count in metrics + reload)
async def wrapper(*args, use_cache=True, **kwargs):
key = make_key(func, args, kwargs, self.skip_args)
if not use_cache:
# force refresh: invalidate to trigger miss, then get+load (for correct metrics)
self.cache.delete(key)
# fallthrough to get below
# get (with loader on miss; now covers force-miss case too)
async def loader():
return await func(*args, **kwargs)
return await self.cache.get(key, loader=loader)
wrapper.__name__ = func.__name__
wrapper.__dict__['clear_cache'] = self.clear_cache
wrapper.__dict__['get_metrics'] = self.cache.get_metrics
wrapper.__dict__['save_to_backup'] = self.cache.save_to_backup
wrapper.__dict__['load_from_backup'] = self.cache.load_from_backup
# invalidate as closure (needs func + skip_args for key)
def invalidate_cache(*args, **kwargs):
key = make_key(func, args, kwargs, self.skip_args)
self.cache.delete(key)
wrapper.__dict__['invalidate_cache'] = invalidate_cache
# Raw invalidate (no skip_args) for use by AsyncTTLInvalidator
def _invalidate_raw(*args, **kwargs):
key = make_key(func, args, kwargs, 0)
self.cache.delete(key)
wrapper.__dict__['_invalidate_raw'] = _invalidate_raw
return wrapper