Back to Journal
DevOps

Complete Guide to Zero-Downtime Deployments with Python

A comprehensive guide to implementing Zero-Downtime Deployments using Python, covering architecture, code examples, and production-ready patterns.

Muneer Puthiya Purayil 17 min read

Python's asynchronous capabilities and lightweight deployment options make zero-downtime deployments achievable without heavy infrastructure. Whether you're running FastAPI, Django, or Flask behind Gunicorn or Uvicorn, the patterns are the same: graceful shutdown, health checks, connection draining, and safe database migrations. This guide covers production-ready patterns for Python web services.

Graceful Shutdown with Uvicorn

Uvicorn handles SIGTERM gracefully by default, but you need application-level coordination:

python
1# main.py
2import asyncio
3import signal
4from contextlib import asynccontextmanager
5from fastapi import FastAPI, Request, Response
6 
7shutdown_event = asyncio.Event()
8is_ready = False
9 
10@asynccontextmanager
11async def lifespan(app: FastAPI):
12 global is_ready
13 # Startup: warm caches, verify connections
14 await warm_caches()
15 is_ready = True
16 yield
17 # Shutdown: drain connections
18 is_ready = False
19 await asyncio.sleep(15) # Wait for LB deregistration
20 await drain_connections()
21 
22app = FastAPI(lifespan=lifespan)
23 
24@app.get("/health/ready")
25async def readiness():
26 if not is_ready:
27 return Response(status_code=503, content="not ready")
28 return {"status": "healthy"}
29 
30@app.get("/health/live")
31async def liveness():
32 return {"status": "alive"}
33 
34@app.middleware("http")
35async def shutdown_middleware(request: Request, call_next):
36 if not is_ready and not request.url.path.startswith("/health"):
37 return Response(
38 status_code=503,
39 headers={"Connection": "close", "Retry-After": "5"},
40 content="Service shutting down",
41 )
42 return await call_next(request)
43 

Gunicorn with Pre-Fork Workers

For production deployments, Gunicorn manages worker processes:

python
1# gunicorn.conf.py
2import multiprocessing
3 
4bind = "0.0.0.0:8080"
5workers = multiprocessing.cpu_count() * 2 + 1
6worker_class = "uvicorn.workers.UvicornWorker"
7timeout = 120
8graceful_timeout = 30
9keepalive = 5
10 
11# Graceful restart: new workers start before old ones stop
12max_requests = 1000 # Restart workers after N requests (prevents memory leaks)
13max_requests_jitter = 50 # Add randomness to prevent thundering herd
14 
15# Preload app to share memory across workers
16preload_app = True
17 
18def on_starting(server):
19 """Called just before the master process is initialized."""
20 pass
21 
22def pre_fork(server, worker):
23 """Called just before a worker is forked."""
24 pass
25 
26def post_fork(server, worker):
27 """Called just after a worker has been forked."""
28 pass
29 
30def worker_exit(server, worker):
31 """Called when a worker exits."""
32 pass
33 

Zero-downtime reload with Gunicorn:

bash
1# Send HUP signal for graceful restart
2# Gunicorn starts new workers, then gracefully stops old ones
3kill -HUP $(cat /var/run/gunicorn.pid)
4 

Health Check Implementation

python
1# health.py
2import asyncio
3from dataclasses import dataclass, field
4from typing import Optional
5import time
6import asyncpg
7import aioredis
8 
9@dataclass
10class HealthStatus:
11 status: str
12 components: dict[str, str] = field(default_factory=dict)
13 version: str = ""
14 uptime_seconds: float = 0
15 
16class HealthChecker:
17 def __init__(self, db_pool: asyncpg.Pool, redis: aioredis.Redis):
18 self.db_pool = db_pool
19 self.redis = redis
20 self.start_time = time.monotonic()
21 self._ready = False
22 
23 @property
24 def ready(self) -> bool:
25 return self._ready
26 
27 def set_ready(self, ready: bool):
28 self._ready = ready
29 
30 async def check(self) -> HealthStatus:
31 components = {}
32 
33 # Database check
34 try:
35 async with self.db_pool.acquire() as conn:
36 await asyncio.wait_for(
37 conn.fetchval("SELECT 1"),
38 timeout=2.0,
39 )
40 components["database"] = "healthy"
41 except Exception as e:
42 components["database"] = f"unhealthy: {e}"
43 
44 # Redis check
45 try:
46 await asyncio.wait_for(
47 self.redis.ping(),
48 timeout=2.0,
49 )
50 components["redis"] = "healthy"
51 except Exception as e:
52 components["redis"] = f"unhealthy: {e}"
53 
54 all_healthy = all(v == "healthy" for v in components.values())
55 
56 return HealthStatus(
57 status="healthy" if all_healthy else "degraded",
58 components=components,
59 version=os.getenv("APP_VERSION", "unknown"),
60 uptime_seconds=time.monotonic() - self.start_time,
61 )
62 

Database Migrations with Alembic

Safe migration patterns for zero-downtime:

python
1# alembic/env.py — Configure for online migrations
2from alembic import context
3 
4def run_migrations_online():
5 connectable = engine_from_config(
6 config.get_section(config.config_ini_section),
7 prefix="sqlalchemy.",
8 )
9 with connectable.connect() as connection:
10 context.configure(
11 connection=connection,
12 target_metadata=target_metadata,
13 # Important: don't auto-generate destructive operations
14 compare_type=True,
15 render_as_batch=True,
16 )
17 with context.begin_transaction():
18 context.run_migrations()
19 
python
1# alembic/versions/001_add_email_verified.py
2"""Add email_verified column
3 
4Safe migration: adds nullable column, no table locks.
5"""
6 
7from alembic import op
8import sqlalchemy as sa
9 
10def upgrade():
11 # Safe: nullable column addition doesn't lock the table
12 op.add_column(
13 "users",
14 sa.Column("email_verified", sa.Boolean(), nullable=True),
15 )
16 # Create index concurrently (PostgreSQL)
17 op.execute(
18 "CREATE INDEX CONCURRENTLY IF NOT EXISTS "
19 "idx_users_email_verified ON users (email_verified)"
20 )
21 
22def downgrade():
23 op.drop_index("idx_users_email_verified", table_name="users")
24 op.drop_column("users", "email_verified")
25 

Need a second opinion on your DevOps pipelines architecture?

I run free 30-minute strategy calls for engineering teams tackling this exact problem.

Book a Free Call

Connection Draining

python
1# drain.py
2import asyncio
3from dataclasses import dataclass
4from prometheus_client import Gauge
5 
6active_requests = Gauge(
7 "http_active_requests",
8 "Number of currently active HTTP requests",
9)
10 
11@dataclass
12class ConnectionDrainer:
13 _count: int = 0
14 _draining: bool = False
15 _lock: asyncio.Lock = None
16 
17 def __post_init__(self):
18 self._lock = asyncio.Lock()
19 
20 async def track_request(self):
21 async with self._lock:
22 self._count += 1
23 active_requests.inc()
24 
25 async def release_request(self):
26 async with self._lock:
27 self._count -= 1
28 active_requests.dec()
29 
30 def start_draining(self):
31 self._draining = True
32 
33 @property
34 def is_draining(self) -> bool:
35 return self._draining
36 
37 @property
38 def active_count(self) -> int:
39 return self._count
40 
41 async def wait_for_drain(self, timeout: float = 30.0):
42 deadline = asyncio.get_event_loop().time() + timeout
43 while asyncio.get_event_loop().time() < deadline:
44 if self._count == 0:
45 return True
46 await asyncio.sleep(0.1)
47 return self._count == 0
48 

Feature Flags

python
1# features.py
2import hashlib
3import json
4from dataclasses import dataclass
5import aioredis
6 
7@dataclass
8class FeatureFlag:
9 key: str
10 enabled: bool
11 rollout_percent: int = 0
12 allowed_tenants: list[str] | None = None
13 
14class FeatureService:
15 def __init__(self, redis: aioredis.Redis):
16 self.redis = redis
17 self._cache: dict[str, FeatureFlag] = {}
18 
19 async def is_enabled(self, key: str, tenant_id: str) -> bool:
20 flag = self._cache.get(key)
21 if not flag:
22 flag = await self._load_flag(key)
23 if not flag or not flag.enabled:
24 return False
25 
26 if flag.allowed_tenants and tenant_id in flag.allowed_tenants:
27 return True
28 
29 # Consistent hashing for percentage rollout
30 hash_input = f"{key}:{tenant_id}".encode()
31 hash_val = int(hashlib.md5(hash_input).hexdigest()[:8], 16)
32 return (hash_val % 100) < flag.rollout_percent
33 
34 async def _load_flag(self, key: str) -> FeatureFlag | None:
35 raw = await self.redis.get(f"flag:{key}")
36 if not raw:
37 return None
38 data = json.loads(raw)
39 flag = FeatureFlag(**data)
40 self._cache[key] = flag
41 return flag
42 
43 async def refresh_cache(self):
44 keys = await self.redis.keys("flag:*")
45 for key in keys:
46 raw = await self.redis.get(key)
47 if raw:
48 data = json.loads(raw)
49 flag_key = data["key"]
50 self._cache[flag_key] = FeatureFlag(**data)
51 

Docker Configuration

dockerfile
1FROM python:3.12-slim
2 
3WORKDIR /app
4 
5COPY requirements.txt .
6RUN pip install --no-cache-dir -r requirements.txt
7 
8COPY . .
9 
10# Run migrations separately, not in the startup command
11# Migrations should be run as a separate CI/CD step
12 
13HEALTHCHECK --interval=10s --timeout=5s --retries=3 \
14 CMD curl -f http://localhost:8080/health/ready || exit 1
15 
16CMD ["gunicorn", "main:app", "-c", "gunicorn.conf.py"]
17 
yaml
1# docker-compose.yml
2services:
3 api:
4 image: api:latest
5 deploy:
6 replicas: 3
7 update_config:
8 parallelism: 1
9 delay: 30s
10 order: start-first
11 rollback_config:
12 parallelism: 0
13 order: stop-first
14 healthcheck:
15 test: ["CMD", "curl", "-f", "http://localhost:8080/health/ready"]
16 interval: 10s
17 timeout: 5s
18 retries: 3
19 ports:
20 - "8080:8080"
21 

Kubernetes Deployment

yaml
1apiVersion: apps/v1
2kind: Deployment
3metadata:
4 name: api-server
5spec:
6 replicas: 3
7 strategy:
8 type: RollingUpdate
9 rollingUpdate:
10 maxSurge: 1
11 maxUnavailable: 0
12 template:
13 spec:
14 terminationGracePeriodSeconds: 45
15 containers:
16 - name: api
17 image: api-server:latest
18 ports:
19 - containerPort: 8080
20 readinessProbe:
21 httpGet:
22 path: /health/ready
23 port: 8080
24 initialDelaySeconds: 10
25 periodSeconds: 5
26 livenessProbe:
27 httpGet:
28 path: /health/live
29 port: 8080
30 initialDelaySeconds: 15
31 periodSeconds: 10
32 lifecycle:
33 preStop:
34 exec:
35 command: ["/bin/sh", "-c", "sleep 15"]
36 

Cache Warming

python
1# warmup.py
2import asyncio
3 
4async def warm_caches():
5 """Pre-populate frequently accessed caches before serving traffic."""
6 tasks = [
7 warm_plan_cache(),
8 warm_config_cache(),
9 warm_tenant_cache(),
10 ]
11 await asyncio.gather(*tasks, return_exceptions=True)
12 
13async def warm_plan_cache():
14 plans = await db.fetch("SELECT * FROM plans WHERE active = true")
15 for plan in plans:
16 await redis.set(f"plan:{plan['id']}", json.dumps(dict(plan)), ex=3600)
17 
18async def warm_tenant_cache():
19 tenants = await db.fetch(
20 "SELECT id, settings FROM tenants ORDER BY request_count DESC LIMIT 200"
21 )
22 pipe = redis.pipeline()
23 for tenant in tenants:
24 pipe.set(f"tenant:{tenant['id']}:settings",
25 json.dumps(dict(tenant)), ex=3600)
26 await pipe.execute()
27 

FAQ

Need expert help?

Building with CI/CD pipelines?

I help teams ship production-grade systems. From architecture review to hands-on builds.

Muneer Puthiya Purayil

SaaS Architect & AI Systems Engineer. 10+ years shipping production infrastructure across fintech, automotive, e-commerce, and healthcare.

Engage

Start a
Conversation.

For teams building at scale: SaaS platforms, agentic AI systems, and enterprise mobile infrastructure. Scope and fit are evaluated before any engagement begins.

Limited availability · Q3 / Q4 2026