Remote Cache (Redis)
async-cache supports distributed caching via a two-tier architecture:
L1 (Local): In-memory cache — fast, per-process (default)
L2 (Remote): Redis backend — shared across all application instances
This enables multi-instance deployments (e.g., Kubernetes pods, auto-scaled workers) to share cached data without redundant backend calls.
No external dependencies — the Redis client is a pure-Python implementation built into async-cache using raw TCP sockets and the RESP protocol.
Installation
pip install async-cache[redis]
The [redis] extra is optional and installs no additional packages — it simply
signals intent. The built-in Redis client requires no third-party libraries.
Quick Start
from cache import AsyncCache, RedisBackend
redis = RedisBackend(host="localhost", port=6379)
cache = AsyncCache(maxsize=1000, default_ttl=300, remote_cache=redis)
async def get_user(user_id):
return await cache.get(
f"user:{user_id}",
loader=lambda: fetch_from_database(user_id),
)
# First call: L1 miss → L2 miss → loader → writes to L1 + L2
# Second call: L1 hit (instant)
# Call from another instance: L1 miss → L2 hit → populates L1
How It Works
Read Pattern:
Check L1 (local in-memory cache) — immediate, zero latency
If L1 miss, check L2 (Redis) — one network round-trip
If L2 miss, call the loader (e.g., database query)
Store the result in L1 (sync) and L2 (async background)
Write Pattern:
Write to L1 synchronously (immediate, non-blocking)
Write to L2 asynchronously in the background using
asyncio.create_task()L2 write failures are logged but never block or crash the application
Error Handling:
All Redis errors are caught and logged — the app never crashes due to Redis
If Redis is down, the cache gracefully degrades to L1 only
Thundering herd protection and batch loading work normally with
remote_cache
With Decorators
from cache import AsyncLRU, AsyncTTL, RedisBackend
redis = RedisBackend(host="redis.example.com", port=6379)
@AsyncLRU(maxsize=128, remote_cache=redis)
async def get_product(product_id: int):
return await db.query_product(product_id)
@AsyncTTL(time_to_live=60, remote_cache=redis)
async def get_session(session_id: str):
return await db.get_session(session_id)
With AgentCache
from agent_cache import AgentCache
from cache import RedisBackend
redis = RedisBackend(host="redis.example.com", port=6379, prefix="agent:")
@AgentCache(resource="cart", scope="global", ttl=120, remote_cache=redis)
async def get_cart(user_id):
return await shop_api.get_cart(user_id)
Combined with Disk Persistence
You can use both backup (disk persistence) and remote_cache (Redis) together.
The disk backup is used for cold-start warmup, while Redis handles cross-instance sharing:
from cache import AsyncCache, DiskBackend, RedisBackend
backup = DiskBackend("/tmp/cache.pkl")
redis = RedisBackend(host="localhost", port=6379)
cache = AsyncCache(
maxsize=1000,
default_ttl=300,
backup=backup,
remote_cache=redis,
)
# On init: loads from disk (backup)
# During runtime: L1 → L2 (Redis) → loader
# On shutdown: save to disk
cache.save_to_backup()
RedisBackend Configuration
Parameter |
Default |
Description |
|---|---|---|
|
|
Redis server hostname |
|
|
Redis server port |
|
|
Redis database number |
|
|
Optional Redis authentication password |
|
|
Key prefix for namespacing (prevents collisions) |
|
|
Connection timeout in seconds |
|
|
Read/write timeout in seconds |
Custom Remote Backends
To implement a custom remote backend (e.g., Memcached, DynamoDB), subclass
RemoteCacheBackend:
from cache.remote import RemoteCacheBackend
class MemcachedBackend(RemoteCacheBackend):
async def get(self, key: str):
...
async def set(self, key: str, value, ttl=None):
...
async def delete(self, key: str):
...
async def clear(self):
...
async def close(self):
...
The RemoteCacheBackend ABC requires five async methods:
Method |
Description |
|---|---|
|
Return deserialized value or |
|
Store a value with optional TTL (seconds) |
|
Remove a single key |
|
Remove all entries managed by this instance |
|
Close the connection to the remote store |
Production Recommendations
Key prefix: Use distinct prefixes per service/environment to avoid key collisions:
RedisBackend(prefix="myapp:prod:")
Timeouts: Set short timeouts to prevent slow Redis from blocking your app:
RedisBackend(connect_timeout=2, socket_timeout=2)
TTL: Always set a
default_ttlto prevent unbounded Redis memory growth:cache = AsyncCache(default_ttl=3600, remote_cache=redis)
Graceful shutdown: Close the Redis connection on app shutdown:
import atexit import asyncio async def shutdown(): await redis.close() atexit.register(lambda: asyncio.run(shutdown()))