"""
Remote cache backend abstraction for distributed (L2) caching.
Provides:
- RemoteCacheBackend: async abstract interface for remote cache stores
- RedisBackend: lightweight pure-Python Redis client (no external deps)
The remote cache is used as L2 in the two-tier caching architecture:
L1 = local in-memory cache (AsyncCache)
L2 = remote cache (Redis, Memcached, etc.)
All methods are async since remote operations involve network I/O.
"""
import asyncio
import logging
import pickle
from abc import ABC, abstractmethod
logger = logging.getLogger("async_cache.remote")
[docs]
class RemoteCacheBackend(ABC):
"""Async abstract interface for remote (L2) cache storage.
Subclass this to implement Redis, Memcached, or any other remote store.
All methods are async to support non-blocking network I/O.
"""
[docs]
@abstractmethod
async def get(self, key: str):
"""Get a value from the remote cache.
:param key: string cache key
:returns: deserialized value, or None on miss
"""
[docs]
@abstractmethod
async def set(self, key: str, value, ttl=None):
"""Set a value in the remote cache.
:param key: string cache key
:param value: value to store (will be serialized)
:param ttl: optional TTL in seconds (float or int)
"""
[docs]
@abstractmethod
async def delete(self, key: str):
"""Delete a key from the remote cache."""
[docs]
@abstractmethod
async def clear(self):
"""Remove all entries managed by this cache instance."""
[docs]
@abstractmethod
async def close(self):
"""Close the connection to the remote store."""
[docs]
class RedisBackend(RemoteCacheBackend):
"""Pure-Python async Redis client for L2 caching.
Uses raw TCP sockets speaking the RESP protocol — no external
dependencies required. Suitable for GET/SET/DEL/FLUSHDB operations.
:param host: Redis server hostname (default "localhost")
:param port: Redis server port (default 6379)
:param db: Redis database number (default 0)
:param password: Optional Redis password
:param prefix: Key prefix to namespace entries (default "ac:")
:param connect_timeout: Connection timeout in seconds (default 5)
:param socket_timeout: Read/write timeout in seconds (default 5)
"""
def __init__(
self,
host="localhost",
port=6379,
db=0,
password=None,
prefix="ac:",
connect_timeout=5,
socket_timeout=5,
):
self.host = host
self.port = port
self.db = db
self.password = password
self.prefix = prefix
self.connect_timeout = connect_timeout
self.socket_timeout = socket_timeout
self._reader = None
self._writer = None
self._lock = asyncio.Lock()
def _make_key(self, key: str) -> str:
"""Prefix the key for namespacing."""
return f"{self.prefix}{key}"
async def _ensure_connected(self):
"""Establish connection if not already connected."""
if self._writer is not None and not self._writer.is_closing():
return
self._reader, self._writer = await asyncio.wait_for(
asyncio.open_connection(self.host, self.port),
timeout=self.connect_timeout,
)
if self.password:
await self._execute("AUTH", self.password)
if self.db != 0:
await self._execute("SELECT", str(self.db))
def _encode_command(self, *args) -> bytes:
"""Encode a command into RESP protocol format."""
parts = [f"*{len(args)}\r\n".encode()]
for arg in args:
if isinstance(arg, bytes):
encoded = arg
else:
encoded = str(arg).encode()
parts.append(f"${len(encoded)}\r\n".encode())
parts.append(encoded)
parts.append(b"\r\n")
return b"".join(parts)
async def _read_response(self):
"""Read and parse a RESP response."""
line = await asyncio.wait_for(
self._reader.readline(),
timeout=self.socket_timeout,
)
if not line:
raise ConnectionError("Connection closed by server")
prefix = chr(line[0])
data = line[1:].strip()
if prefix == "+":
return data.decode()
elif prefix == "-":
raise Exception(f"Redis error: {data.decode()}")
elif prefix == ":":
return int(data)
elif prefix == "$":
length = int(data)
if length == -1:
return None
bulk = await asyncio.wait_for(
self._reader.readexactly(length + 2),
timeout=self.socket_timeout,
)
return bulk[:length]
elif prefix == "*":
count = int(data)
if count == -1:
return None
result = []
for _ in range(count):
result.append(await self._read_response())
return result
else:
raise Exception(f"Unknown RESP prefix: {prefix}")
async def _execute(self, *args):
"""Send a command and read the response."""
await self._ensure_connected()
cmd = self._encode_command(*args)
self._writer.write(cmd)
await self._writer.drain()
return await self._read_response()
[docs]
async def get(self, key: str):
"""Get a value from Redis. Returns None on miss or error."""
try:
async with self._lock:
raw = await self._execute("GET", self._make_key(key))
if raw is None:
return None
return pickle.loads(raw)
except Exception:
logger.debug("RedisBackend.get failed for key=%s", key, exc_info=True)
return None
[docs]
async def set(self, key: str, value, ttl=None):
"""Set a value in Redis with optional TTL (in seconds)."""
try:
data = pickle.dumps(value, protocol=pickle.HIGHEST_PROTOCOL)
async with self._lock:
if ttl is not None:
# PX for millisecond precision
ms = int(ttl * 1000)
if ms > 0:
await self._execute(
"SET", self._make_key(key), data, "PX", str(ms)
)
else:
# TTL <= 0: don't store
return
else:
await self._execute("SET", self._make_key(key), data)
except Exception:
logger.debug("RedisBackend.set failed for key=%s", key, exc_info=True)
[docs]
async def delete(self, key: str):
"""Delete a key from Redis."""
try:
async with self._lock:
await self._execute("DEL", self._make_key(key))
except Exception:
logger.debug("RedisBackend.delete failed for key=%s", key, exc_info=True)
[docs]
async def clear(self):
"""Remove all keys with the configured prefix using SCAN + DEL."""
try:
async with self._lock:
cursor = "0"
pattern = f"{self.prefix}*"
while True:
result = await self._execute("SCAN", cursor, "MATCH", pattern, "COUNT", "100")
cursor = result[0].decode() if isinstance(result[0], bytes) else str(result[0])
keys = result[1]
if keys:
await self._execute("DEL", *keys)
if cursor == "0":
break
except Exception:
logger.debug("RedisBackend.clear failed", exc_info=True)
[docs]
async def close(self):
"""Close the Redis connection."""
if self._writer is not None and not self._writer.is_closing():
self._writer.close()
try:
await self._writer.wait_closed()
except Exception:
pass
self._reader = None
self._writer = None