Remote Cache (Redis)

async-cache supports distributed caching via a two-tier architecture:

  • L1 (Local): In-memory cache — fast, per-process (default)

  • L2 (Remote): Redis backend — shared across all application instances

This enables multi-instance deployments (e.g., Kubernetes pods, auto-scaled workers) to share cached data without redundant backend calls.

No external dependencies — the Redis client is a pure-Python implementation built into async-cache using raw TCP sockets and the RESP protocol.

Installation

pip install async-cache[redis]

The [redis] extra is optional and installs no additional packages — it simply signals intent. The built-in Redis client requires no third-party libraries.

Quick Start

from cache import AsyncCache, RedisBackend

redis = RedisBackend(host="localhost", port=6379)
cache = AsyncCache(maxsize=1000, default_ttl=300, remote_cache=redis)

async def get_user(user_id):
    return await cache.get(
        f"user:{user_id}",
        loader=lambda: fetch_from_database(user_id),
    )

# First call: L1 miss → L2 miss → loader → writes to L1 + L2
# Second call: L1 hit (instant)
# Call from another instance: L1 miss → L2 hit → populates L1

How It Works

Read Pattern:

  1. Check L1 (local in-memory cache) — immediate, zero latency

  2. If L1 miss, check L2 (Redis) — one network round-trip

  3. If L2 miss, call the loader (e.g., database query)

  4. Store the result in L1 (sync) and L2 (async background)

Write Pattern:

  1. Write to L1 synchronously (immediate, non-blocking)

  2. Write to L2 asynchronously in the background using asyncio.create_task()

  3. L2 write failures are logged but never block or crash the application

Error Handling:

  • All Redis errors are caught and logged — the app never crashes due to Redis

  • If Redis is down, the cache gracefully degrades to L1 only

  • Thundering herd protection and batch loading work normally with remote_cache

With Decorators

from cache import AsyncLRU, AsyncTTL, RedisBackend

redis = RedisBackend(host="redis.example.com", port=6379)

@AsyncLRU(maxsize=128, remote_cache=redis)
async def get_product(product_id: int):
    return await db.query_product(product_id)

@AsyncTTL(time_to_live=60, remote_cache=redis)
async def get_session(session_id: str):
    return await db.get_session(session_id)

With AgentCache

from agent_cache import AgentCache
from cache import RedisBackend

redis = RedisBackend(host="redis.example.com", port=6379, prefix="agent:")

@AgentCache(resource="cart", scope="global", ttl=120, remote_cache=redis)
async def get_cart(user_id):
    return await shop_api.get_cart(user_id)

Combined with Disk Persistence

You can use both backup (disk persistence) and remote_cache (Redis) together. The disk backup is used for cold-start warmup, while Redis handles cross-instance sharing:

from cache import AsyncCache, DiskBackend, RedisBackend

backup = DiskBackend("/tmp/cache.pkl")
redis = RedisBackend(host="localhost", port=6379)

cache = AsyncCache(
    maxsize=1000,
    default_ttl=300,
    backup=backup,
    remote_cache=redis,
)

# On init: loads from disk (backup)
# During runtime: L1 → L2 (Redis) → loader
# On shutdown: save to disk
cache.save_to_backup()

RedisBackend Configuration

Parameter

Default

Description

host

"localhost"

Redis server hostname

port

6379

Redis server port

db

0

Redis database number

password

None

Optional Redis authentication password

prefix

"ac:"

Key prefix for namespacing (prevents collisions)

connect_timeout

5

Connection timeout in seconds

socket_timeout

5

Read/write timeout in seconds

Custom Remote Backends

To implement a custom remote backend (e.g., Memcached, DynamoDB), subclass RemoteCacheBackend:

from cache.remote import RemoteCacheBackend

class MemcachedBackend(RemoteCacheBackend):
    async def get(self, key: str):
        ...

    async def set(self, key: str, value, ttl=None):
        ...

    async def delete(self, key: str):
        ...

    async def clear(self):
        ...

    async def close(self):
        ...

The RemoteCacheBackend ABC requires five async methods:

Method

Description

get(key)

Return deserialized value or None on miss

set(key, value, ttl=None)

Store a value with optional TTL (seconds)

delete(key)

Remove a single key

clear()

Remove all entries managed by this instance

close()

Close the connection to the remote store

Production Recommendations

  1. Key prefix: Use distinct prefixes per service/environment to avoid key collisions:

    RedisBackend(prefix="myapp:prod:")
    
  2. Timeouts: Set short timeouts to prevent slow Redis from blocking your app:

    RedisBackend(connect_timeout=2, socket_timeout=2)
    
  3. TTL: Always set a default_ttl to prevent unbounded Redis memory growth:

    cache = AsyncCache(default_ttl=3600, remote_cache=redis)
    
  4. Graceful shutdown: Close the Redis connection on app shutdown:

    import atexit
    import asyncio
    
    async def shutdown():
        await redis.close()
    
    atexit.register(lambda: asyncio.run(shutdown()))