async-cache
Production-ready asyncio cache with thundering herd protection, batch loading, and comprehensive metrics.
async-cache is a high-performance, in-memory application-layer cache designed for asyncio-based microservices. It solves critical caching challenges like thundering herd protection, batch loading (DataLoader pattern), and cache warming—making it ideal for high-throughput services where cache efficiency directly impacts database load and response latency.
Installation
pip install async-cache
Requires Python 3.8+.
Why async-cache?
Problem: Caching in async microservices is hard
Thundering herd: When cache expires, 1000 concurrent requests can overwhelm your database
N+1 queries: Loading related data efficiently without batching kills performance
Cold starts: Empty cache after restart causes latency spikes
Observability: No visibility into cache effectiveness
Solution: async-cache provides these out of the box
Feature |
Benefit |
|---|---|
Thundering Herd Protection |
Only 1 backend call even with 1000 concurrent cache misses |
DataLoader Batching |
Automatic batching of concurrent requests (N+1 → 1 query) |
Cache Warmup |
Preload hot data at startup to avoid cold starts |
Metrics & Observability |
Built-in hit rates for monitoring and optimization |
Flexible TTL |
Per-key TTL with global defaults |
LRU Eviction |
Automatic eviction of least-recently-used items |
Quick Start
Basic Usage
import asyncio
from cache import AsyncCache
cache = AsyncCache(maxsize=1000, default_ttl=300)
async def get_user(user_id: int) -> dict:
"""Get user with automatic caching."""
return await cache.get(
f"user:{user_id}",
loader=lambda: fetch_from_database(user_id)
)
Decorator Usage
from cache import AsyncLRU, AsyncTTL
@AsyncLRU(maxsize=128)
async def get_product(product_id: int):
return await db.query_product(product_id)
@AsyncTTL(time_to_live=60) # 60 second TTL
async def get_session(session_id: str):
return await db.get_session(session_id)
Core Features
Thundering Herd Protection
When a cached item expires under heavy load, multiple concurrent requests can trigger duplicate database queries (thundering herd). async-cache ensures only one loader executes while others wait for the result.
# 1000 concurrent requests, only 1 database query
tasks = [get_user(123) for _ in range(1000)]
results = await asyncio.gather(*tasks)
DataLoader-Style Batching
Automatically batch concurrent requests to reduce database round-trips. Perfect for GraphQL resolvers or loading related entities.
async def batch_load_users(user_ids: list[int]) -> list[dict]:
"""Load multiple users in a single query."""
return await db.query_users_in_batch(user_ids)
# These two calls are automatically batched into one query
user1, user2 = await asyncio.gather(
cache.get(1, batch_loader=batch_load_users),
cache.get(2, batch_loader=batch_load_users)
)
Cache Warmup
Preload critical data at application startup to avoid cold-start latency.
async def startup():
await cache.warmup({
"config:app": load_app_config,
"feature_flags": load_feature_flags,
"popular:products": load_popular_products,
})
Metrics & Observability
Built-in metrics for monitoring cache performance and optimizing TTL values.
metrics = cache.get_metrics()
print(f"Hit rate: {metrics['hit_rate']:.1%}")
print(f"Size: {metrics['size']}")
print(f"Hits/Misses: {metrics['hits']}/{metrics['misses']}")
TTL & Invalidation
Flexible time-to-live with global defaults and per-key overrides.
# Global default TTL
cache = AsyncCache(default_ttl=300) # 5 minutes
# Per-key override
await cache.set("session:123", data, ttl=3600) # 1 hour
await cache.set("temp:data", data, ttl=60) # 1 minute
await cache.set("permanent:data", data, ttl=None) # No expiration
# Manual invalidation
await cache.delete("session:123")
cache.clear() # Clear all
Advanced Usage
Per-Key TTL Override
async def get_data(key: str, cache_minutes: int = 5):
return await cache.get(
key,
loader=lambda: expensive_query(key),
ttl=cache_minutes * 60
)
Skip Arguments in Cache Key
For class methods, skip self from the cache key:
class UserService:
@AsyncLRU(maxsize=100, skip_args=1) # Skip 'self'
async def get_user(self, user_id: int):
return await self.db.get_user(user_id)
Force Refresh
Bypass cache and force a fresh load:
@AsyncTTL(time_to_live=60)
async def get_status():
return await check_service_status()
# Force fresh check
status = await get_status(use_cache=False)
Configuration Options
AsyncCache Parameters
Parameter |
Default |
Description |
|---|---|---|
maxsize |
128 |
Maximum number of items in cache (None = unlimited) |
default_ttl |
None |
Default TTL in seconds (None = no expiration) |
batch_window_ms |
5 |
Window for batching concurrent requests (milliseconds) |
max_batch_size |
100 |
Maximum batch size for DataLoader pattern |
Decorator Parameters
Parameter |
Default |
Description |
|---|---|---|
maxsize (AsyncLRU) |
128 |
Maximum cache size |
time_to_live (AsyncTTL) |
60 |
TTL in seconds |
skip_args |
0 |
Number of initial args to skip in cache key (for self/cls) |
API Reference
See API Reference for complete API documentation.
Best Practices
Use thundering herd protection for hot keys under heavy load
Enable batching for GraphQL resolvers or related entity loading
Warmup critical data at application startup
Monitor metrics to tune TTL and maxsize values
Use appropriate TTLs - short for volatile data, long for static data
Performance Characteristics
Get/Set: O(1) average case
Memory: O(n) where n is maxsize
Thundering herd: O(1) loader calls regardless of concurrent requests
Batching: Automatic within configured window