Back to Journal
DevOps

How to Build Zero-Downtime Deployments Using Fastapi

Step-by-step tutorial for building Zero-Downtime Deployments with Fastapi, from project setup through deployment.

Muneer Puthiya Purayil 20 min read

FastAPI's async-first architecture makes it ideal for building services that deploy without downtime. This tutorial walks through building a FastAPI application with production-grade graceful shutdown, health checks, database migration safety, and rolling deployment support — from project setup through Kubernetes deployment.

Project Setup

bash
1mkdir fastapi-zero-downtime && cd fastapi-zero-downtime
2python -m venv venv && source venv/bin/activate
3pip install fastapi uvicorn[standard] sqlalchemy alembic asyncpg redis httpx
4 
1project/
2├── app/
3│ ├── __init__.py
4│ ├── main.py
5│ ├── config.py
6│ ├── health.py
7│ ├── middleware.py
8│ ├── features.py
9│ ├── database.py
10│ └── routes/
11│ └── api.py
12├── alembic/
13│ ├── env.py
14│ └── versions/
15├── alembic.ini
16├── Dockerfile
17├── docker-compose.yml
18├── gunicorn.conf.py
19└── k8s/
20 ├── deployment.yaml
21 └── service.yaml
22 

Application Core with Lifespan Management

python
1# app/main.py
2import asyncio
3import signal
4from contextlib import asynccontextmanager
5from fastapi import FastAPI
6from app.database import init_db, close_db
7from app.health import HealthChecker
8from app.middleware import RequestTrackerMiddleware, ShutdownMiddleware
9from app.routes.api import router as api_router
10 
11health_checker = HealthChecker()
12 
13@asynccontextmanager
14async def lifespan(app: FastAPI):
15 # Startup
16 await init_db()
17 await warm_caches()
18 health_checker.set_ready(True)
19 print("Application ready")
20 
21 yield
22 
23 # Shutdown
24 print("Shutdown initiated")
25 health_checker.set_ready(False)
26 
27 # Wait for load balancer to deregister
28 await asyncio.sleep(15)
29 
30 # Wait for active requests to drain
31 tracker = app.state.request_tracker
32 drained = await tracker.wait_for_drain(timeout=30)
33 if not drained:
34 print(f"Warning: {tracker.active_count} requests still active")
35 
36 await close_db()
37 print("Shutdown complete")
38 
39app = FastAPI(lifespan=lifespan)
40 
41# Add middleware (order matters — outermost first)
42app.add_middleware(ShutdownMiddleware, health_checker=health_checker)
43app.add_middleware(RequestTrackerMiddleware)
44 
45# Routes
46app.include_router(api_router, prefix="/api")
47 
48@app.get("/health/ready")
49async def readiness():
50 status = await health_checker.check()
51 if status.status != "healthy":
52 from fastapi.responses import JSONResponse
53 return JSONResponse(status_code=503, content=status.__dict__)
54 return status.__dict__
55 
56@app.get("/health/live")
57async def liveness():
58 return {"status": "alive"}
59 
60async def warm_caches():
61 """Pre-populate caches before serving traffic."""
62 from app.database import get_db
63 async with get_db() as db:
64 # Warm frequently accessed data
65 plans = await db.fetch_all("SELECT * FROM plans WHERE active = true")
66 for plan in plans:
67 await redis.set(f"plan:{plan['id']}", plan.json(), ex=3600)
68 print(f"Warmed {len(plans)} plan caches")
69 

Request Tracking Middleware

python
1# app/middleware.py
2import asyncio
3import time
4from starlette.middleware.base import BaseHTTPMiddleware
5from starlette.requests import Request
6from starlette.responses import Response, JSONResponse
7from prometheus_client import Counter, Histogram, Gauge
8 
9REQUEST_COUNT = Counter(
10 "http_requests_total", "Total requests",
11 ["method", "path", "status"],
12)
13REQUEST_LATENCY = Histogram(
14 "http_request_duration_seconds", "Request latency",
15 ["method", "path"],
16)
17ACTIVE_REQUESTS = Gauge(
18 "http_active_requests", "Active requests",
19)
20 
21class RequestTracker:
22 def __init__(self):
23 self._count = 0
24 self._lock = asyncio.Lock()
25 
26 async def increment(self):
27 async with self._lock:
28 self._count += 1
29 ACTIVE_REQUESTS.inc()
30 
31 async def decrement(self):
32 async with self._lock:
33 self._count -= 1
34 ACTIVE_REQUESTS.dec()
35 
36 @property
37 def active_count(self) -> int:
38 return self._count
39 
40 async def wait_for_drain(self, timeout: float = 30.0) -> bool:
41 deadline = asyncio.get_event_loop().time() + timeout
42 while asyncio.get_event_loop().time() < deadline:
43 if self._count == 0:
44 return True
45 await asyncio.sleep(0.1)
46 return self._count == 0
47 
48class RequestTrackerMiddleware(BaseHTTPMiddleware):
49 async def dispatch(self, request: Request, call_next):
50 if not hasattr(request.app.state, "request_tracker"):
51 request.app.state.request_tracker = RequestTracker()
52 
53 tracker = request.app.state.request_tracker
54 await tracker.increment()
55 
56 start = time.monotonic()
57 try:
58 response = await call_next(request)
59 return response
60 finally:
61 duration = time.monotonic() - start
62 await tracker.decrement()
63 
64 path = request.url.path
65 REQUEST_COUNT.labels(
66 method=request.method,
67 path=path,
68 status=response.status_code if 'response' in dir() else 500,
69 ).inc()
70 REQUEST_LATENCY.labels(
71 method=request.method, path=path,
72 ).observe(duration)
73 
74class ShutdownMiddleware(BaseHTTPMiddleware):
75 def __init__(self, app, health_checker):
76 super().__init__(app)
77 self.health_checker = health_checker
78 
79 async def dispatch(self, request: Request, call_next):
80 if not self.health_checker.ready:
81 if not request.url.path.startswith("/health"):
82 return JSONResponse(
83 status_code=503,
84 content={"error": "Service shutting down"},
85 headers={
86 "Connection": "close",
87 "Retry-After": "5",
88 },
89 )
90 return await call_next(request)
91 

Health Checker with Dependency Verification

python
1# app/health.py
2import asyncio
3from dataclasses import dataclass, field
4import time
5 
6@dataclass
7class HealthStatus:
8 status: str
9 components: dict = field(default_factory=dict)
10 version: str = ""
11 uptime: float = 0
12 
13class HealthChecker:
14 def __init__(self):
15 self._ready = False
16 self._start_time = time.monotonic()
17 
18 @property
19 def ready(self) -> bool:
20 return self._ready
21 
22 def set_ready(self, ready: bool):
23 self._ready = ready
24 
25 async def check(self) -> HealthStatus:
26 if not self._ready:
27 return HealthStatus(status="not_ready")
28 
29 components = {}
30 
31 # Database health
32 try:
33 from app.database import get_db
34 async with get_db() as db:
35 await asyncio.wait_for(
36 db.fetch_val("SELECT 1"),
37 timeout=2.0,
38 )
39 components["database"] = "healthy"
40 except Exception as e:
41 components["database"] = f"unhealthy: {str(e)[:100]}"
42 
43 # Redis health
44 try:
45 from app.config import redis
46 await asyncio.wait_for(redis.ping(), timeout=2.0)
47 components["redis"] = "healthy"
48 except Exception as e:
49 components["redis"] = f"unhealthy: {str(e)[:100]}"
50 
51 all_healthy = all(v == "healthy" for v in components.values())
52 return HealthStatus(
53 status="healthy" if all_healthy else "degraded",
54 components=components,
55 version=os.getenv("APP_VERSION", "unknown"),
56 uptime=time.monotonic() - self._start_time,
57 )
58 

Need a second opinion on your DevOps pipelines architecture?

I run free 30-minute strategy calls for engineering teams tackling this exact problem.

Book a Free Call

Database Migrations with Alembic

python
1# alembic/env.py
2from alembic import context
3from sqlalchemy import engine_from_config, pool
4from app.config import DATABASE_URL
5 
6config = context.config
7config.set_main_option("sqlalchemy.url", DATABASE_URL)
8 
9def run_migrations_online():
10 connectable = engine_from_config(
11 config.get_section(config.config_ini_section),
12 prefix="sqlalchemy.",
13 poolclass=pool.NullPool,
14 )
15 with connectable.connect() as connection:
16 context.configure(
17 connection=connection,
18 target_metadata=None,
19 )
20 with context.begin_transaction():
21 context.run_migrations()
22 
23run_migrations_online()
24 

Safe migration example:

python
1# alembic/versions/001_add_status_column.py
2"""Add order status column - zero downtime safe
3 
4This migration adds a nullable column. The application code
5handles both cases (column exists and column missing).
6"""
7from alembic import op
8import sqlalchemy as sa
9 
10def upgrade():
11 op.add_column(
12 "orders",
13 sa.Column("fulfillment_status", sa.String(50), nullable=True),
14 )
15 # Create index concurrently to avoid table locks
16 op.execute(
17 "CREATE INDEX CONCURRENTLY IF NOT EXISTS "
18 "idx_orders_fulfillment_status ON orders (fulfillment_status)"
19 )
20 
21def downgrade():
22 op.drop_index("idx_orders_fulfillment_status", table_name="orders")
23 op.drop_column("orders", "fulfillment_status")
24 

Feature Flag Service

python
1# app/features.py
2import hashlib
3import json
4from app.config import redis
5 
6class FeatureFlags:
7 def __init__(self):
8 self._cache: dict = {}
9 
10 async def is_enabled(self, key: str, tenant_id: str) -> bool:
11 flag = self._cache.get(key)
12 if not flag:
13 raw = await redis.get(f"flag:{key}")
14 if not raw:
15 return False
16 flag = json.loads(raw)
17 self._cache[key] = flag
18 
19 if not flag.get("enabled"):
20 return False
21 
22 allowed = flag.get("allowed_tenants", [])
23 if tenant_id in allowed:
24 return True
25 
26 h = hashlib.md5(f"{key}:{tenant_id}".encode()).hexdigest()
27 return int(h[:8], 16) % 100 < flag.get("rollout_percent", 0)
28 
29 async def refresh(self):
30 keys = await redis.keys("flag:*")
31 for k in keys:
32 raw = await redis.get(k)
33 if raw:
34 data = json.loads(raw)
35 self._cache[data["key"]] = data
36 
37features = FeatureFlags()
38 

Gunicorn Configuration

python
1# gunicorn.conf.py
2import multiprocessing
3 
4bind = "0.0.0.0:8080"
5workers = multiprocessing.cpu_count() * 2 + 1
6worker_class = "uvicorn.workers.UvicornWorker"
7timeout = 120
8graceful_timeout = 30
9keepalive = 5
10max_requests = 1000
11max_requests_jitter = 50
12preload_app = True
13accesslog = "-"
14errorlog = "-"
15loglevel = "info"
16 

Docker and Kubernetes Deployment

dockerfile
1FROM python:3.12-slim AS builder
2WORKDIR /app
3COPY requirements.txt .
4RUN pip install --no-cache-dir -r requirements.txt
5COPY . .
6 
7FROM python:3.12-slim
8WORKDIR /app
9COPY --from=builder /app /app
10COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
11COPY --from=builder /usr/local/bin /usr/local/bin
12 
13USER nobody
14CMD ["gunicorn", "app.main:app", "-c", "gunicorn.conf.py"]
15 
yaml
1# k8s/deployment.yaml
2apiVersion: apps/v1
3kind: Deployment
4metadata:
5 name: fastapi-service
6spec:
7 replicas: 3
8 strategy:
9 type: RollingUpdate
10 rollingUpdate:
11 maxSurge: 1
12 maxUnavailable: 0
13 template:
14 spec:
15 terminationGracePeriodSeconds: 60
16 containers:
17 - name: api
18 image: fastapi-service:latest
19 ports:
20 - containerPort: 8080
21 readinessProbe:
22 httpGet:
23 path: /health/ready
24 port: 8080
25 initialDelaySeconds: 10
26 periodSeconds: 5
27 failureThreshold: 3
28 livenessProbe:
29 httpGet:
30 path: /health/live
31 port: 8080
32 initialDelaySeconds: 15
33 periodSeconds: 10
34 lifecycle:
35 preStop:
36 exec:
37 command: ["/bin/sh", "-c", "sleep 15"]
38 

CI/CD Pipeline

yaml
1# .github/workflows/deploy.yml
2name: Deploy
3on:
4 push:
5 branches: [main]
6 
7jobs:
8 test:
9 runs-on: ubuntu-latest
10 steps:
11 - uses: actions/checkout@v4
12 - uses: actions/setup-python@v5
13 with:
14 python-version: "3.12"
15 - run: pip install -r requirements.txt
16 - run: pytest tests/ -v
17 
18 migrate:
19 needs: test
20 runs-on: ubuntu-latest
21 steps:
22 - uses: actions/checkout@v4
23 - run: pip install alembic sqlalchemy asyncpg
24 - run: alembic upgrade head
25 env:
26 DATABASE_URL: ${{ secrets.DATABASE_URL }}
27 
28 deploy:
29 needs: migrate
30 runs-on: ubuntu-latest
31 steps:
32 - uses: actions/checkout@v4
33 - name: Build and push image
34 run: |
35 docker build -t $ECR_REPO:${{ github.sha }} .
36 docker push $ECR_REPO:${{ github.sha }}
37 - name: Deploy to Kubernetes
38 run: |
39 kubectl set image deployment/fastapi-service \
40 api=$ECR_REPO:${{ github.sha }}
41 kubectl rollout status deployment/fastapi-service --timeout=300s
42
43 - name: Verify health
44 run: |
45 for i in {1..30}; do
46 if curl -sf https://api.example.com/health/ready; then
47 echo "Deployment healthy"
48 exit 0
49 fi
50 sleep 5
51 done
52 echo "Health check failed"
53 kubectl rollout undo deployment/fastapi-service
54 exit 1
55

FAQ

Need expert help?

Building with CI/CD pipelines?

I help teams ship production-grade systems. From architecture review to hands-on builds.

Muneer Puthiya Purayil

SaaS Architect & AI Systems Engineer. 10+ years shipping production infrastructure across fintech, automotive, e-commerce, and healthcare.

Engage

Start a
Conversation.

For teams building at scale: SaaS platforms, agentic AI systems, and enterprise mobile infrastructure. Scope and fit are evaluated before any engagement begins.

Limited availability · Q3 / Q4 2026