Back to Journal
System Design

Complete Guide to Database Sharding with Python

A comprehensive guide to implementing Database Sharding using Python, covering architecture, code examples, and production-ready patterns.

Muneer Puthiya Purayil 19 min read

Python's ecosystem offers robust database tooling through SQLAlchemy, asyncpg, and psycopg for building sharded database architectures. While Python's GIL limits true parallelism for CPU-bound operations, async I/O with asyncio provides excellent performance for I/O-bound database sharding operations where the bottleneck is network latency, not computation. This guide covers implementing production-ready database sharding in Python.

Shard Router

python
1import hashlib
2from bisect import bisect_right
3from typing import Dict, List, Optional
4 
5class ConsistentHashRouter:
6 def __init__(self, virtual_nodes: int = 150):
7 self.virtual_nodes = virtual_nodes
8 self._ring: Dict[int, str] = {}
9 self._sorted_hashes: List[int] = []
10 
11 def add_shard(self, shard_id: str) -> None:
12 for i in range(self.virtual_nodes):
13 key = f"{shard_id}:{i}"
14 hash_val = self._hash(key)
15 self._ring[hash_val] = shard_id
16 self._sorted_hashes = sorted(self._ring.keys())
17 
18 def remove_shard(self, shard_id: str) -> None:
19 for i in range(self.virtual_nodes):
20 key = f"{shard_id}:{i}"
21 hash_val = self._hash(key)
22 self._ring.pop(hash_val, None)
23 self._sorted_hashes = sorted(self._ring.keys())
24 
25 def route(self, shard_key: str) -> str:
26 if not self._sorted_hashes:
27 raise ValueError("No shards configured")
28 hash_val = self._hash(shard_key)
29 idx = bisect_right(self._sorted_hashes, hash_val)
30 if idx >= len(self._sorted_hashes):
31 idx = 0
32 return self._ring[self._sorted_hashes[idx]]
33 
34 @staticmethod
35 def _hash(key: str) -> int:
36 return int(hashlib.md5(key.encode()).hexdigest(), 16)
37 

Async Connection Pool Manager

python
1import asyncpg
2from dataclasses import dataclass
3from typing import Dict, Optional
4 
5@dataclass
6class ShardConfig:
7 shard_id: str
8 host: str
9 port: int
10 database: str
11 user: str
12 password: str
13 min_connections: int = 5
14 max_connections: int = 20
15 
16class AsyncPoolManager:
17 def __init__(self):
18 self._pools: Dict[str, asyncpg.Pool] = {}
19 
20 async def initialize(self, configs: list[ShardConfig]) -> None:
21 for config in configs:
22 pool = await asyncpg.create_pool(
23 host=config.host,
24 port=config.port,
25 database=config.database,
26 user=config.user,
27 password=config.password,
28 min_size=config.min_connections,
29 max_size=config.max_connections,
30 command_timeout=10,
31 )
32 self._pools[config.shard_id] = pool
33 
34 def get_pool(self, shard_id: str) -> asyncpg.Pool:
35 pool = self._pools.get(shard_id)
36 if not pool:
37 raise ValueError(f"Unknown shard: {shard_id}")
38 return pool
39 
40 @property
41 def all_shard_ids(self) -> list[str]:
42 return list(self._pools.keys())
43 
44 async def close(self) -> None:
45 for pool in self._pools.values():
46 await pool.close()
47 
48 async def health_check(self) -> dict[str, dict]:
49 results = {}
50 for shard_id, pool in self._pools.items():
51 try:
52 async with pool.acquire() as conn:
53 await conn.fetchval("SELECT 1")
54 results[shard_id] = {
55 "healthy": True,
56 "pool_size": pool.get_size(),
57 "pool_free": pool.get_idle_size(),
58 }
59 except Exception as e:
60 results[shard_id] = {"healthy": False, "error": str(e)}
61 return results
62 

Sharded Database Client

python
1import asyncio
2from typing import Any, List, Optional, TypeVar, Callable
3 
4T = TypeVar("T")
5 
6class ShardedDB:
7 def __init__(self, router: ConsistentHashRouter, pool_manager: AsyncPoolManager):
8 self.router = router
9 self.pools = pool_manager
10 
11 async def query_shard(
12 self, shard_key: str, sql: str, *args: Any
13 ) -> list[asyncpg.Record]:
14 shard_id = self.router.route(shard_key)
15 pool = self.pools.get_pool(shard_id)
16 async with pool.acquire() as conn:
17 return await conn.fetch(sql, *args)
18 
19 async def execute_shard(
20 self, shard_key: str, sql: str, *args: Any
21 ) -> str:
22 shard_id = self.router.route(shard_key)
23 pool = self.pools.get_pool(shard_id)
24 async with pool.acquire() as conn:
25 return await conn.execute(sql, *args)
26 
27 async def query_one_shard(
28 self, shard_key: str, sql: str, *args: Any
29 ) -> Optional[asyncpg.Record]:
30 shard_id = self.router.route(shard_key)
31 pool = self.pools.get_pool(shard_id)
32 async with pool.acquire() as conn:
33 return await conn.fetchrow(sql, *args)
34 
35 async def scatter_gather(
36 self, sql: str, *args: Any, timeout: float = 10.0
37 ) -> dict[str, list[asyncpg.Record]]:
38 async def query_shard(shard_id: str) -> tuple[str, list[asyncpg.Record]]:
39 pool = self.pools.get_pool(shard_id)
40 async with pool.acquire() as conn:
41 rows = await conn.fetch(sql, *args)
42 return shard_id, rows
43 
44 tasks = [
45 asyncio.wait_for(query_shard(sid), timeout=timeout)
46 for sid in self.pools.all_shard_ids
47 ]
48 
49 results = {}
50 for coro in asyncio.as_completed(tasks):
51 try:
52 shard_id, rows = await coro
53 results[shard_id] = rows
54 except asyncio.TimeoutError:
55 pass # Log timeout
56 except Exception as e:
57 pass # Log error
58 
59 return results
60 
61 async def scatter_aggregate(
62 self,
63 sql: str,
64 *args: Any,
65 aggregator: Callable[[list[asyncpg.Record]], Any],
66 ) -> Any:
67 all_results = await self.scatter_gather(sql, *args)
68 all_rows = [row for rows in all_results.values() for row in rows]
69 return aggregator(all_rows)
70 

Need a second opinion on your system design architecture?

I run free 30-minute strategy calls for engineering teams tackling this exact problem.

Book a Free Call

Repository Pattern

python
1from dataclasses import dataclass
2from datetime import datetime
3from typing import Optional
4from uuid import uuid4
5 
6@dataclass
7class Order:
8 tenant_id: str
9 order_id: str
10 customer_id: str
11 status: str
12 total_cents: int
13 currency: str
14 created_at: datetime
15 
16class OrderRepository:
17 def __init__(self, db: ShardedDB):
18 self.db = db
19 
20 async def create(self, order: Order) -> None:
21 await self.db.execute_shard(
22 order.tenant_id,
23 """INSERT INTO orders (tenant_id, order_id, customer_id, status, total_cents, currency, created_at)
24 VALUES ($1, $2, $3, $4, $5, $6, $7)""",
25 order.tenant_id, order.order_id, order.customer_id,
26 order.status, order.total_cents, order.currency, order.created_at,
27 )
28 
29 async def get_by_id(self, tenant_id: str, order_id: str) -> Optional[Order]:
30 row = await self.db.query_one_shard(
31 tenant_id,
32 """SELECT tenant_id, order_id, customer_id, status, total_cents, currency, created_at
33 FROM orders WHERE tenant_id = $1 AND order_id = $2""",
34 tenant_id, order_id,
35 )
36 return self._to_order(row) if row else None
37 
38 async def list_by_tenant(
39 self, tenant_id: str, limit: int = 20, offset: int = 0
40 ) -> list[Order]:
41 rows = await self.db.query_shard(
42 tenant_id,
43 """SELECT tenant_id, order_id, customer_id, status, total_cents, currency, created_at
44 FROM orders WHERE tenant_id = $1
45 ORDER BY created_at DESC LIMIT $2 OFFSET $3""",
46 tenant_id, limit, offset,
47 )
48 return [self._to_order(row) for row in rows]
49 
50 async def get_global_stats(self) -> dict:
51 results = await self.db.scatter_gather(
52 """SELECT COUNT(*) as count, COALESCE(SUM(total_cents), 0) as revenue
53 FROM orders WHERE created_at > NOW() - INTERVAL '30 days'"""
54 )
55 
56 total_orders = 0
57 total_revenue = 0
58 for rows in results.values():
59 if rows:
60 total_orders += rows[0]["count"]
61 total_revenue += rows[0]["revenue"]
62 
63 return {"total_orders": total_orders, "total_revenue": total_revenue}
64 
65 @staticmethod
66 def _to_order(row: asyncpg.Record) -> Order:
67 return Order(
68 tenant_id=row["tenant_id"],
69 order_id=row["order_id"],
70 customer_id=row["customer_id"],
71 status=row["status"],
72 total_cents=row["total_cents"],
73 currency=row["currency"],
74 created_at=row["created_at"],
75 )
76 

FastAPI Integration

python
1from contextlib import asynccontextmanager
2from fastapi import FastAPI, Depends, HTTPException
3from pydantic import BaseModel
4 
5sharded_db: Optional[ShardedDB] = None
6 
7@asynccontextmanager
8async def lifespan(app: FastAPI):
9 global sharded_db
10 configs = load_shard_configs()
11 router = ConsistentHashRouter()
12 pool_manager = AsyncPoolManager()
13 
14 for config in configs:
15 router.add_shard(config.shard_id)
16 await pool_manager.initialize(configs)
17 
18 sharded_db = ShardedDB(router, pool_manager)
19 yield
20 await pool_manager.close()
21 
22app = FastAPI(lifespan=lifespan)
23 
24def get_db() -> ShardedDB:
25 return sharded_db
26 
27def get_order_repo(db: ShardedDB = Depends(get_db)) -> OrderRepository:
28 return OrderRepository(db)
29 
30class CreateOrderRequest(BaseModel):
31 tenant_id: str
32 customer_id: str
33 total_cents: int
34 currency: str
35 
36@app.post("/orders", status_code=201)
37async def create_order(
38 request: CreateOrderRequest,
39 repo: OrderRepository = Depends(get_order_repo),
40):
41 order = Order(
42 tenant_id=request.tenant_id,
43 order_id=str(uuid4()),
44 customer_id=request.customer_id,
45 status="placed",
46 total_cents=request.total_cents,
47 currency=request.currency,
48 created_at=datetime.utcnow(),
49 )
50 await repo.create(order)
51 return {"order_id": order.order_id}
52 
53@app.get("/orders/{tenant_id}/{order_id}")
54async def get_order(
55 tenant_id: str,
56 order_id: str,
57 repo: OrderRepository = Depends(get_order_repo),
58):
59 order = await repo.get_by_id(tenant_id, order_id)
60 if not order:
61 raise HTTPException(status_code=404, detail="Order not found")
62 return order
63 
64@app.get("/stats")
65async def get_stats(repo: OrderRepository = Depends(get_order_repo)):
66 return await repo.get_global_stats()
67 
68@app.get("/health/shards")
69async def shard_health(db: ShardedDB = Depends(get_db)):
70 return await db.pools.health_check()
71 

Testing

python
1import pytest
2import asyncio
3from unittest.mock import AsyncMock, MagicMock
4 
5@pytest.fixture
6def router():
7 r = ConsistentHashRouter(virtual_nodes=150)
8 r.add_shard("shard-0")
9 r.add_shard("shard-1")
10 r.add_shard("shard-2")
11 return r
12 
13def test_consistent_routing(router):
14 shard1 = router.route("tenant-abc")
15 shard2 = router.route("tenant-abc")
16 assert shard1 == shard2
17 
18def test_distribution(router):
19 counts = {}
20 for i in range(10000):
21 shard = router.route(f"tenant-{i}")
22 counts[shard] = counts.get(shard, 0) + 1
23 
24 for shard, count in counts.items():
25 ratio = count / 10000
26 assert 0.15 < ratio < 0.55, f"Shard {shard} has skewed distribution: {ratio}"
27 
28def test_routing_stability_on_add(router):
29 initial = {f"key-{i}": router.route(f"key-{i}") for i in range(1000)}
30 router.add_shard("shard-3")
31 moved = sum(1 for k, v in initial.items() if router.route(k) != v)
32 assert moved / 1000 < 0.5 # Less than 50% should move
33 
34@pytest.mark.asyncio
35async def test_order_repository():
36 mock_db = AsyncMock(spec=ShardedDB)
37 mock_db.execute_shard = AsyncMock()
38 mock_db.query_one_shard = AsyncMock(return_value=None)
39 
40 repo = OrderRepository(mock_db)
41 
42 order = Order(
43 tenant_id="tenant-1", order_id="order-1", customer_id="cust-1",
44 status="placed", total_cents=5000, currency="USD",
45 created_at=datetime.utcnow(),
46 )
47 
48 await repo.create(order)
49 mock_db.execute_shard.assert_called_once()
50 assert mock_db.execute_shard.call_args[0][0] == "tenant-1"
51 

Conclusion

Python's async capabilities through asyncio and asyncpg provide an efficient foundation for database sharding. The consistent hash router ensures even data distribution, the async pool manager handles per-shard connection lifecycle, and the scatter-gather pattern enables cross-shard queries with bounded concurrency. FastAPI's dependency injection integrates cleanly with the sharding layer, making shard-aware endpoints straightforward to implement and test.

For Python applications where database I/O is the bottleneck (which it almost always is in sharded architectures), the async approach performs comparably to Go and Java implementations while maintaining Python's development velocity advantage.

FAQ

Need expert help?

Building with system design?

I help teams ship production-grade systems. From architecture review to hands-on builds.

Muneer Puthiya Purayil

SaaS Architect & AI Systems Engineer. 10+ years shipping production infrastructure across fintech, automotive, e-commerce, and healthcare.

Engage

Start a
Conversation.

For teams building at scale: SaaS platforms, agentic AI systems, and enterprise mobile infrastructure. Scope and fit are evaluated before any engagement begins.

Limited availability · Q3 / Q4 2026