Source code for cache.remote

"""
Remote cache backend abstraction for distributed (L2) caching.

Provides:
- RemoteCacheBackend: async abstract interface for remote cache stores
- RedisBackend: lightweight pure-Python Redis client (no external deps)

The remote cache is used as L2 in the two-tier caching architecture:
  L1 = local in-memory cache (AsyncCache)
  L2 = remote cache (Redis, Memcached, etc.)

All methods are async since remote operations involve network I/O.
"""

import asyncio
import logging
import pickle
from abc import ABC, abstractmethod

logger = logging.getLogger("async_cache.remote")


[docs] class RemoteCacheBackend(ABC): """Async abstract interface for remote (L2) cache storage. Subclass this to implement Redis, Memcached, or any other remote store. All methods are async to support non-blocking network I/O. """
[docs] @abstractmethod async def get(self, key: str): """Get a value from the remote cache. :param key: string cache key :returns: deserialized value, or None on miss """
[docs] @abstractmethod async def set(self, key: str, value, ttl=None): """Set a value in the remote cache. :param key: string cache key :param value: value to store (will be serialized) :param ttl: optional TTL in seconds (float or int) """
[docs] @abstractmethod async def delete(self, key: str): """Delete a key from the remote cache."""
[docs] @abstractmethod async def clear(self): """Remove all entries managed by this cache instance."""
[docs] @abstractmethod async def close(self): """Close the connection to the remote store."""
[docs] class RedisBackend(RemoteCacheBackend): """Pure-Python async Redis client for L2 caching. Uses raw TCP sockets speaking the RESP protocol — no external dependencies required. Suitable for GET/SET/DEL/FLUSHDB operations. :param host: Redis server hostname (default "localhost") :param port: Redis server port (default 6379) :param db: Redis database number (default 0) :param password: Optional Redis password :param prefix: Key prefix to namespace entries (default "ac:") :param connect_timeout: Connection timeout in seconds (default 5) :param socket_timeout: Read/write timeout in seconds (default 5) """ def __init__( self, host="localhost", port=6379, db=0, password=None, prefix="ac:", connect_timeout=5, socket_timeout=5, ): self.host = host self.port = port self.db = db self.password = password self.prefix = prefix self.connect_timeout = connect_timeout self.socket_timeout = socket_timeout self._reader = None self._writer = None self._lock = asyncio.Lock() def _make_key(self, key: str) -> str: """Prefix the key for namespacing.""" return f"{self.prefix}{key}" async def _ensure_connected(self): """Establish connection if not already connected.""" if self._writer is not None and not self._writer.is_closing(): return self._reader, self._writer = await asyncio.wait_for( asyncio.open_connection(self.host, self.port), timeout=self.connect_timeout, ) if self.password: await self._execute("AUTH", self.password) if self.db != 0: await self._execute("SELECT", str(self.db)) def _encode_command(self, *args) -> bytes: """Encode a command into RESP protocol format.""" parts = [f"*{len(args)}\r\n".encode()] for arg in args: if isinstance(arg, bytes): encoded = arg else: encoded = str(arg).encode() parts.append(f"${len(encoded)}\r\n".encode()) parts.append(encoded) parts.append(b"\r\n") return b"".join(parts) async def _read_response(self): """Read and parse a RESP response.""" line = await asyncio.wait_for( self._reader.readline(), timeout=self.socket_timeout, ) if not line: raise ConnectionError("Connection closed by server") prefix = chr(line[0]) data = line[1:].strip() if prefix == "+": return data.decode() elif prefix == "-": raise Exception(f"Redis error: {data.decode()}") elif prefix == ":": return int(data) elif prefix == "$": length = int(data) if length == -1: return None bulk = await asyncio.wait_for( self._reader.readexactly(length + 2), timeout=self.socket_timeout, ) return bulk[:length] elif prefix == "*": count = int(data) if count == -1: return None result = [] for _ in range(count): result.append(await self._read_response()) return result else: raise Exception(f"Unknown RESP prefix: {prefix}") async def _execute(self, *args): """Send a command and read the response.""" await self._ensure_connected() cmd = self._encode_command(*args) self._writer.write(cmd) await self._writer.drain() return await self._read_response()
[docs] async def get(self, key: str): """Get a value from Redis. Returns None on miss or error.""" try: async with self._lock: raw = await self._execute("GET", self._make_key(key)) if raw is None: return None return pickle.loads(raw) except Exception: logger.debug("RedisBackend.get failed for key=%s", key, exc_info=True) return None
[docs] async def set(self, key: str, value, ttl=None): """Set a value in Redis with optional TTL (in seconds).""" try: data = pickle.dumps(value, protocol=pickle.HIGHEST_PROTOCOL) async with self._lock: if ttl is not None: # PX for millisecond precision ms = int(ttl * 1000) if ms > 0: await self._execute( "SET", self._make_key(key), data, "PX", str(ms) ) else: # TTL <= 0: don't store return else: await self._execute("SET", self._make_key(key), data) except Exception: logger.debug("RedisBackend.set failed for key=%s", key, exc_info=True)
[docs] async def delete(self, key: str): """Delete a key from Redis.""" try: async with self._lock: await self._execute("DEL", self._make_key(key)) except Exception: logger.debug("RedisBackend.delete failed for key=%s", key, exc_info=True)
[docs] async def clear(self): """Remove all keys with the configured prefix using SCAN + DEL.""" try: async with self._lock: cursor = "0" pattern = f"{self.prefix}*" while True: result = await self._execute("SCAN", cursor, "MATCH", pattern, "COUNT", "100") cursor = result[0].decode() if isinstance(result[0], bytes) else str(result[0]) keys = result[1] if keys: await self._execute("DEL", *keys) if cursor == "0": break except Exception: logger.debug("RedisBackend.clear failed", exc_info=True)
[docs] async def close(self): """Close the Redis connection.""" if self._writer is not None and not self._writer.is_closing(): self._writer.close() try: await self._writer.wait_closed() except Exception: pass self._reader = None self._writer = None