Back to Journal
System Design

Complete Guide to Distributed Caching with Python

A comprehensive guide to implementing Distributed Caching using Python, covering architecture, code examples, and production-ready patterns.

Muneer Puthiya Purayil 16 min read

Python's async ecosystem and redis-py library provide a straightforward path to implementing distributed caching. With asyncio for non-blocking Redis operations and dataclasses for typed cache entries, Python delivers a clean caching layer that integrates naturally with FastAPI, Django, and Flask applications. This guide covers production-ready caching patterns in Python.

Async Cache Client

python
1import json
2import time
3import asyncio
4from dataclasses import dataclass
5from typing import Any, Callable, Optional, TypeVar
6import redis.asyncio as aioredis
7 
8T = TypeVar("T")
9 
10class CacheClient:
11 def __init__(self, redis_url: str, prefix: str = "", default_ttl: int = 300):
12 self.redis = aioredis.from_url(redis_url, decode_responses=True)
13 self.prefix = prefix
14 self.default_ttl = default_ttl
15 self._hits = 0
16 self._misses = 0
17 
18 def _key(self, key: str) -> str:
19 return f"{self.prefix}:{key}" if self.prefix else key
20 
21 async def get(self, key: str) -> Any | None:
22 raw = await self.redis.get(self._key(key))
23 if raw is None:
24 self._misses += 1
25 return None
26 self._hits += 1
27 return json.loads(raw)
28 
29 async def set(self, key: str, value: Any, ttl: int | None = None) -> None:
30 await self.redis.setex(
31 self._key(key), ttl or self.default_ttl, json.dumps(value, default=str)
32 )
33 
34 async def delete(self, *keys: str) -> None:
35 if keys:
36 await self.redis.delete(*[self._key(k) for k in keys])
37 
38 async def get_or_load(
39 self, key: str, loader: Callable[[], Any], ttl: int | None = None
40 ) -> Any:
41 cached = await self.get(key)
42 if cached is not None:
43 return cached
44 value = await loader() if asyncio.iscoroutinefunction(loader) else loader()
45 await self.set(key, value, ttl)
46 return value
47 
48 async def get_many(self, keys: list[str]) -> dict[str, Any]:
49 full_keys = [self._key(k) for k in keys]
50 values = await self.redis.mget(*full_keys)
51 result = {}
52 for key, raw in zip(keys, values):
53 if raw is not None:
54 result[key] = json.loads(raw)
55 return result
56 
57 async def set_many(self, entries: dict[str, Any], ttl: int | None = None) -> None:
58 pipe = self.redis.pipeline()
59 for key, value in entries.items():
60 pipe.setex(self._key(key), ttl or self.default_ttl, json.dumps(value, default=str))
61 await pipe.execute()
62 
63 @property
64 def hit_rate(self) -> float:
65 total = self._hits + self._misses
66 return self._hits / total if total > 0 else 0.0
67 
68 async def close(self) -> None:
69 await self.redis.close()
70 

Cache Decorator

python
1import functools
2import hashlib
3 
4def cached(cache: CacheClient, ttl: int = 300, key_prefix: str = ""):
5 def decorator(func):
6 @functools.wraps(func)
7 async def wrapper(*args, **kwargs):
8 # Build cache key from function name and arguments
9 key_parts = [key_prefix or func.__name__]
10 key_parts.extend(str(a) for a in args)
11 key_parts.extend(f"{k}={v}" for k, v in sorted(kwargs.items()))
12 cache_key = hashlib.md5(":".join(key_parts).encode()).hexdigest()
13 full_key = f"{func.__name__}:{cache_key}"
14 
15 cached_value = await cache.get(full_key)
16 if cached_value is not None:
17 return cached_value
18 
19 result = await func(*args, **kwargs)
20 await cache.set(full_key, result, ttl)
21 return result
22 return wrapper
23 return decorator
24 
25# Usage
26cache = CacheClient("redis://localhost:6379", prefix="myapp")
27 
28@cached(cache, ttl=600)
29async def get_product(product_id: str) -> dict:
30 return await db.products.find_one({"_id": product_id})
31 

Stampede Protection

python
1import asyncio
2from collections import defaultdict
3 
4class StampedeProtectedCache:
5 def __init__(self, cache: CacheClient):
6 self.cache = cache
7 self._locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
8 self._inflight: dict[str, asyncio.Task] = {}
9 
10 async def get_or_load(self, key: str, loader, ttl: int = 300) -> Any:
11 cached = await self.cache.get(key)
12 if cached is not None:
13 return cached
14 
15 async with self._locks[key]:
16 # Double-check after acquiring lock
17 cached = await self.cache.get(key)
18 if cached is not None:
19 return cached
20 
21 value = await loader() if asyncio.iscoroutinefunction(loader) else loader()
22 await self.cache.set(key, value, ttl)
23 return value
24 

Need a second opinion on your system design architecture?

I run free 30-minute strategy calls for engineering teams tackling this exact problem.

Book a Free Call

FastAPI Integration

python
1from contextlib import asynccontextmanager
2from fastapi import FastAPI, Depends
3 
4cache: CacheClient | None = None
5 
6@asynccontextmanager
7async def lifespan(app: FastAPI):
8 global cache
9 cache = CacheClient(
10 redis_url=os.environ["REDIS_URL"],
11 prefix="myapp",
12 default_ttl=300,
13 )
14 yield
15 await cache.close()
16 
17app = FastAPI(lifespan=lifespan)
18 
19def get_cache() -> CacheClient:
20 return cache
21 
22@app.get("/products/{product_id}")
23async def get_product(product_id: str, cache: CacheClient = Depends(get_cache)):
24 return await cache.get_or_load(
25 f"product:{product_id}",
26 lambda: db.products.find_one(product_id),
27 ttl=600,
28 )
29 
30@app.put("/products/{product_id}")
31async def update_product(product_id: str, data: ProductUpdate, cache: CacheClient = Depends(get_cache)):
32 product = await db.products.update(product_id, data.dict())
33 await cache.delete(f"product:{product_id}")
34 return product
35 
36@app.get("/cache/stats")
37async def cache_stats(cache: CacheClient = Depends(get_cache)):
38 return {"hit_rate": f"{cache.hit_rate:.1%}", "hits": cache._hits, "misses": cache._misses}
39 

Multi-Level Cache

python
1class MultiLevelCache:
2 def __init__(self, l1_max_size: int = 1000, l1_ttl: int = 60, l2: CacheClient = None):
3 self.l1: dict[str, tuple[Any, float]] = {}
4 self.l1_max_size = l1_max_size
5 self.l1_ttl = l1_ttl
6 self.l2 = l2
7 
8 async def get(self, key: str) -> Any | None:
9 # L1
10 if key in self.l1:
11 value, expires = self.l1[key]
12 if time.time() < expires:
13 return value
14 del self.l1[key]
15 
16 # L2
17 if self.l2:
18 value = await self.l2.get(key)
19 if value is not None:
20 self._l1_set(key, value)
21 return value
22 
23 return None
24 
25 async def set(self, key: str, value: Any, ttl: int = 300) -> None:
26 self._l1_set(key, value)
27 if self.l2:
28 await self.l2.set(key, value, ttl)
29 
30 def _l1_set(self, key: str, value: Any) -> None:
31 if len(self.l1) >= self.l1_max_size:
32 oldest = min(self.l1, key=lambda k: self.l1[k][1])
33 del self.l1[oldest]
34 self.l1[key] = (value, time.time() + self.l1_ttl)
35 

Conclusion

Python's async Redis client and decorator pattern provide a clean, Pythonic caching layer that integrates naturally with modern async frameworks. The get_or_load pattern with stampede protection covers the most common caching use case, while the decorator approach minimizes boilerplate for function-level caching. For Python applications where development velocity is prioritized, this caching layer adds meaningful performance improvements with minimal code complexity.

FAQ

Need expert help?

Building with system design?

I help teams ship production-grade systems. From architecture review to hands-on builds.

Muneer Puthiya Purayil

SaaS Architect & AI Systems Engineer. 10+ years shipping production infrastructure across fintech, automotive, e-commerce, and healthcare.

Engage

Start a
Conversation.

For teams building at scale: SaaS platforms, agentic AI systems, and enterprise mobile infrastructure. Scope and fit are evaluated before any engagement begins.

Limited availability · Q3 / Q4 2026