Back to Journal
SaaS Engineering

How to Build Subscription Billing Systems Using Fastapi

Step-by-step tutorial for building Subscription Billing Systems with Fastapi, from project setup through deployment.

Muneer Puthiya Purayil 21 min read

This tutorial builds a subscription billing system with FastAPI and Stripe — from project setup through database models, webhook processing, billing API endpoints, and usage metering. FastAPI's async support and Pydantic models create a clean, type-safe billing backend.

Project Setup

bash
1mkdir billing-api && cd billing-api
2python -m venv venv && source venv/bin/activate
3pip install fastapi uvicorn sqlalchemy asyncpg alembic stripe pydantic-settings python-dotenv
4 
python
1# app/config.py
2from pydantic_settings import BaseSettings
3 
4class Settings(BaseSettings):
5 database_url: str
6 stripe_secret_key: str
7 stripe_webhook_secret: str
8 app_url: str = "http://localhost:8000"
9 
10 class Config:
11 env_file = ".env"
12 
13settings = Settings()
14 

Step 1: Database Models

python
1# app/models.py
2from sqlalchemy import Column, String, Integer, BigInteger, Boolean, DateTime, JSON, Enum as SAEnum, ForeignKey
3from sqlalchemy.orm import declarative_base, relationship
4from sqlalchemy.sql import func
5import enum
6 
7Base = declarative_base()
8 
9class SubscriptionStatus(str, enum.Enum):
10 active = "active"
11 trialing = "trialing"
12 past_due = "past_due"
13 cancelled = "cancelled"
14 
15class Plan(Base):
16 __tablename__ = "plans"
17 id = Column(String, primary_key=True)
18 name = Column(String, nullable=False)
19 slug = Column(String, unique=True, nullable=False)
20 active = Column(Boolean, default=True)
21 billing_interval = Column(String, nullable=False)
22 base_price_cents = Column(BigInteger, nullable=False)
23 price_per_seat_cents = Column(BigInteger, default=0)
24 included_seats = Column(Integer, default=1)
25 features = Column(JSON, default=list)
26 limits = Column(JSON, default=dict)
27 stripe_price_id = Column(String, nullable=False)
28 created_at = Column(DateTime, server_default=func.now())
29 
30class Subscription(Base):
31 __tablename__ = "subscriptions"
32 id = Column(String, primary_key=True)
33 customer_id = Column(String, nullable=False, index=True)
34 plan_id = Column(String, ForeignKey("plans.id"), nullable=False)
35 stripe_subscription_id = Column(String, unique=True, nullable=False)
36 stripe_customer_id = Column(String, nullable=False)
37 status = Column(SAEnum(SubscriptionStatus), default=SubscriptionStatus.active)
38 quantity = Column(Integer, default=1)
39 current_period_start = Column(DateTime, nullable=False)
40 current_period_end = Column(DateTime, nullable=False)
41 cancel_at_period_end = Column(Boolean, default=False)
42 trial_ends_at = Column(DateTime, nullable=True)
43 cancelled_at = Column(DateTime, nullable=True)
44 created_at = Column(DateTime, server_default=func.now())
45 updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())
46 plan = relationship("Plan")
47 
48class UsageEvent(Base):
49 __tablename__ = "usage_events"
50 id = Column(String, primary_key=True)
51 idempotency_key = Column(String, unique=True, nullable=False)
52 subscription_id = Column(String, ForeignKey("subscriptions.id"), nullable=False)
53 metric_id = Column(String, nullable=False)
54 quantity = Column(BigInteger, nullable=False)
55 timestamp = Column(DateTime, nullable=False)
56 metadata = Column(JSON, default=dict)
57 created_at = Column(DateTime, server_default=func.now())
58 
59class WebhookEvent(Base):
60 __tablename__ = "webhook_events"
61 stripe_event_id = Column(String, primary_key=True)
62 event_type = Column(String, nullable=False)
63 processed_at = Column(DateTime, server_default=func.now())
64 

Step 2: Database Setup

python
1# app/database.py
2from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
3from app.config import settings
4 
5engine = create_async_engine(settings.database_url, echo=False)
6async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
7 
8async def get_db():
9 async with async_session() as session:
10 yield session
11 

Step 3: Billing Service

python
1# app/services/billing.py
2import stripe
3from sqlalchemy import select, func as sa_func
4from sqlalchemy.ext.asyncio import AsyncSession
5from datetime import datetime
6from uuid import uuid4
7from app.models import Plan, Subscription, UsageEvent, SubscriptionStatus
8from app.config import settings
9 
10stripe.api_key = settings.stripe_secret_key
11 
12class BillingService:
13 def __init__(self, db: AsyncSession):
14 self.db = db
15 
16 async def create_subscription(
17 self, customer_id: str, plan_slug: str,
18 quantity: int, stripe_customer_id: str,
19 ) -> Subscription:
20 result = await self.db.execute(
21 select(Plan).where(Plan.slug == plan_slug, Plan.active == True)
22 )
23 plan = result.scalar_one()
24 
25 stripe_sub = stripe.Subscription.create(
26 customer=stripe_customer_id,
27 items=[{"price": plan.stripe_price_id, "quantity": quantity}],
28 )
29 
30 sub = Subscription(
31 id=str(uuid4()),
32 customer_id=customer_id,
33 plan_id=plan.id,
34 stripe_subscription_id=stripe_sub.id,
35 stripe_customer_id=stripe_customer_id,
36 quantity=quantity,
37 current_period_start=datetime.fromtimestamp(stripe_sub.current_period_start),
38 current_period_end=datetime.fromtimestamp(stripe_sub.current_period_end),
39 )
40 self.db.add(sub)
41 await self.db.commit()
42 await self.db.refresh(sub)
43 return sub
44 
45 async def change_plan(self, subscription_id: str, new_plan_slug: str):
46 sub = await self.db.get(Subscription, subscription_id)
47 result = await self.db.execute(
48 select(Plan).where(Plan.slug == new_plan_slug)
49 )
50 new_plan = result.scalar_one()
51 
52 items = stripe.SubscriptionItem.list(subscription=sub.stripe_subscription_id)
53 
54 stripe.Subscription.modify(
55 sub.stripe_subscription_id,
56 items=[{
57 "id": items.data[0].id,
58 "price": new_plan.stripe_price_id,
59 }],
60 proration_behavior="create_prorations",
61 )
62 
63 sub.plan_id = new_plan.id
64 await self.db.commit()
65 return sub
66 
67 async def update_seats(self, subscription_id: str, new_quantity: int):
68 sub = await self.db.get(Subscription, subscription_id)
69 
70 items = stripe.SubscriptionItem.list(subscription=sub.stripe_subscription_id)
71 
72 stripe.Subscription.modify(
73 sub.stripe_subscription_id,
74 items=[{
75 "id": items.data[0].id,
76 "quantity": new_quantity,
77 }],
78 proration_behavior="create_prorations",
79 )
80 
81 sub.quantity = new_quantity
82 await self.db.commit()
83 return sub
84 
85 async def cancel_subscription(self, subscription_id: str, immediately: bool = False):
86 sub = await self.db.get(Subscription, subscription_id)
87 
88 if immediately:
89 stripe.Subscription.cancel(sub.stripe_subscription_id)
90 sub.status = SubscriptionStatus.cancelled
91 sub.cancelled_at = datetime.utcnow()
92 else:
93 stripe.Subscription.modify(
94 sub.stripe_subscription_id,
95 cancel_at_period_end=True,
96 )
97 sub.cancel_at_period_end = True
98 
99 await self.db.commit()
100 return sub
101 
102 async def record_usage(
103 self, subscription_id: str, metric_id: str,
104 quantity: int, idempotency_key: str,
105 ):
106 existing = await self.db.execute(
107 select(UsageEvent).where(UsageEvent.idempotency_key == idempotency_key)
108 )
109 if existing.scalar_one_or_none():
110 return
111 
112 event = UsageEvent(
113 id=str(uuid4()),
114 idempotency_key=idempotency_key,
115 subscription_id=subscription_id,
116 metric_id=metric_id,
117 quantity=quantity,
118 timestamp=datetime.utcnow(),
119 )
120 self.db.add(event)
121 await self.db.commit()
122 
123 async def get_usage(
124 self, subscription_id: str, metric_id: str,
125 period_start: datetime, period_end: datetime,
126 ) -> int:
127 result = await self.db.execute(
128 select(sa_func.coalesce(sa_func.sum(UsageEvent.quantity), 0))
129 .where(
130 UsageEvent.subscription_id == subscription_id,
131 UsageEvent.metric_id == metric_id,
132 UsageEvent.timestamp >= period_start,
133 UsageEvent.timestamp < period_end,
134 )
135 )
136 return int(result.scalar())
137 
138 async def check_feature(self, customer_id: str, feature: str) -> bool:
139 result = await self.db.execute(
140 select(Subscription)
141 .join(Plan)
142 .where(
143 Subscription.customer_id == customer_id,
144 Subscription.status == SubscriptionStatus.active,
145 )
146 )
147 sub = result.scalar_one_or_none()
148 if not sub:
149 return False
150 return feature in (sub.plan.features or [])
151 

Need a second opinion on your saas engineering architecture?

I run free 30-minute strategy calls for engineering teams tackling this exact problem.

Book a Free Call

Step 4: API Endpoints

python
1# app/routes/billing.py
2from fastapi import APIRouter, Depends, HTTPException, Request, Header
3from pydantic import BaseModel
4from sqlalchemy.ext.asyncio import AsyncSession
5from sqlalchemy import select
6import stripe
7 
8from app.database import get_db
9from app.services.billing import BillingService
10from app.models import Subscription, WebhookEvent, SubscriptionStatus
11from app.config import settings
12from app.auth import get_current_user
13 
14router = APIRouter(prefix="/api/billing", tags=["billing"])
15 
16class CreateSubscriptionReq(BaseModel):
17 plan_slug: str
18 quantity: int = 1
19 
20class ChangePlanReq(BaseModel):
21 plan_slug: str
22 
23class UpdateSeatsReq(BaseModel):
24 quantity: int
25 
26@router.get("/subscription")
27async def get_subscription(
28 user_id: str = Depends(get_current_user),
29 db: AsyncSession = Depends(get_db),
30):
31 result = await db.execute(
32 select(Subscription)
33 .where(
34 Subscription.customer_id == user_id,
35 Subscription.status.in_([SubscriptionStatus.active, SubscriptionStatus.trialing]),
36 )
37 )
38 sub = result.scalar_one_or_none()
39 if not sub:
40 raise HTTPException(404, "No active subscription")
41 
42 return {
43 "id": sub.id,
44 "plan_id": sub.plan_id,
45 "status": sub.status.value,
46 "seats": sub.quantity,
47 "current_period_end": sub.current_period_end.isoformat(),
48 "cancel_at_period_end": sub.cancel_at_period_end,
49 }
50 
51@router.post("/subscription", status_code=201)
52async def create_subscription(
53 req: CreateSubscriptionReq,
54 user_id: str = Depends(get_current_user),
55 db: AsyncSession = Depends(get_db),
56):
57 service = BillingService(db)
58 stripe_customer_id = await get_or_create_stripe_customer(user_id, db)
59 sub = await service.create_subscription(
60 user_id, req.plan_slug, req.quantity, stripe_customer_id,
61 )
62 return {"subscription_id": sub.id, "status": sub.status.value}
63 
64@router.put("/subscription/plan")
65async def change_plan(
66 req: ChangePlanReq,
67 user_id: str = Depends(get_current_user),
68 db: AsyncSession = Depends(get_db),
69):
70 service = BillingService(db)
71 sub = await get_active_sub(user_id, db)
72 updated = await service.change_plan(sub.id, req.plan_slug)
73 return {"plan_id": updated.plan_id}
74 
75@router.put("/subscription/seats")
76async def update_seats(
77 req: UpdateSeatsReq,
78 user_id: str = Depends(get_current_user),
79 db: AsyncSession = Depends(get_db),
80):
81 service = BillingService(db)
82 sub = await get_active_sub(user_id, db)
83 updated = await service.update_seats(sub.id, req.quantity)
84 return {"seats": updated.quantity}
85 
86@router.post("/subscription/cancel")
87async def cancel(
88 user_id: str = Depends(get_current_user),
89 db: AsyncSession = Depends(get_db),
90 immediately: bool = False,
91):
92 service = BillingService(db)
93 sub = await get_active_sub(user_id, db)
94 updated = await service.cancel_subscription(sub.id, immediately)
95 return {"status": updated.status.value}
96 

Step 5: Webhook Endpoint

python
1# app/routes/webhooks.py
2from fastapi import APIRouter, Request, HTTPException
3from sqlalchemy import select
4import stripe
5 
6from app.database import async_session
7from app.models import Subscription, WebhookEvent, SubscriptionStatus
8from app.config import settings
9 
10router = APIRouter()
11 
12@router.post("/webhooks/stripe")
13async def stripe_webhook(request: Request):
14 payload = await request.body()
15 sig = request.headers.get("Stripe-Signature")
16 
17 try:
18 event = stripe.Webhook.construct_event(
19 payload, sig, settings.stripe_webhook_secret
20 )
21 except stripe.error.SignatureVerificationError:
22 raise HTTPException(401, "Invalid signature")
23 
24 async with async_session() as db:
25 # Idempotency
26 existing = await db.execute(
27 select(WebhookEvent).where(WebhookEvent.stripe_event_id == event.id)
28 )
29 if existing.scalar_one_or_none():
30 return {"status": "already_processed"}
31 
32 handlers = {
33 "customer.subscription.updated": _handle_sub_updated,
34 "customer.subscription.deleted": _handle_sub_deleted,
35 "invoice.payment_failed": _handle_payment_failed,
36 "invoice.paid": _handle_invoice_paid,
37 }
38 
39 handler = handlers.get(event.type)
40 if handler:
41 await handler(db, event.data.object)
42 
43 db.add(WebhookEvent(stripe_event_id=event.id, event_type=event.type))
44 await db.commit()
45 
46 return {"status": "processed"}
47 
48async def _handle_sub_updated(db, data):
49 result = await db.execute(
50 select(Subscription).where(Subscription.stripe_subscription_id == data["id"])
51 )
52 sub = result.scalar_one_or_none()
53 if not sub:
54 return
55 
56 status_map = {
57 "active": SubscriptionStatus.active,
58 "trialing": SubscriptionStatus.trialing,
59 "past_due": SubscriptionStatus.past_due,
60 "canceled": SubscriptionStatus.cancelled,
61 }
62 sub.status = status_map.get(data["status"], SubscriptionStatus.active)
63 sub.quantity = data["items"]["data"][0]["quantity"]
64 from datetime import datetime
65 sub.current_period_start = datetime.fromtimestamp(data["current_period_start"])
66 sub.current_period_end = datetime.fromtimestamp(data["current_period_end"])
67 sub.cancel_at_period_end = data.get("cancel_at_period_end", False)
68 
69async def _handle_sub_deleted(db, data):
70 result = await db.execute(
71 select(Subscription).where(Subscription.stripe_subscription_id == data["id"])
72 )
73 sub = result.scalar_one_or_none()
74 if sub:
75 sub.status = SubscriptionStatus.cancelled
76 from datetime import datetime
77 sub.cancelled_at = datetime.utcnow()
78 
79async def _handle_payment_failed(db, data):
80 sub_id = data.get("subscription")
81 if not sub_id:
82 return
83 result = await db.execute(
84 select(Subscription).where(Subscription.stripe_subscription_id == sub_id)
85 )
86 sub = result.scalar_one_or_none()
87 if sub:
88 sub.status = SubscriptionStatus.past_due
89 
90async def _handle_invoice_paid(db, data):
91 sub_id = data.get("subscription")
92 if not sub_id:
93 return
94 result = await db.execute(
95 select(Subscription).where(Subscription.stripe_subscription_id == sub_id)
96 )
97 sub = result.scalar_one_or_none()
98 if sub and sub.status == SubscriptionStatus.past_due:
99 sub.status = SubscriptionStatus.active
100 

Step 6: Feature Gating Dependency

python
1# app/dependencies/feature_gate.py
2from fastapi import Depends, HTTPException
3from app.services.billing import BillingService
4from app.database import get_db
5from app.auth import get_current_user
6 
7def require_feature(feature: str):
8 async def check(
9 user_id: str = Depends(get_current_user),
10 db = Depends(get_db),
11 ):
12 service = BillingService(db)
13 if not await service.check_feature(user_id, feature):
14 raise HTTPException(
15 403,
16 detail={
17 "error": f"Feature '{feature}' not available on your plan",
18 "upgrade_url": "/billing/upgrade",
19 },
20 )
21 return Depends(check)
22 
23# Usage
24@router.post("/api/projects", dependencies=[require_feature("projects")])
25async def create_project(...):
26 ...
27 

Step 7: Main Application

python
1# app/main.py
2from fastapi import FastAPI
3from app.routes import billing, webhooks
4 
5app = FastAPI(title="Billing API")
6 
7app.include_router(billing.router)
8app.include_router(webhooks.router)
9 
10@app.get("/health")
11async def health():
12 return {"status": "ok"}
13 

Run with:

bash
uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload

Conclusion

FastAPI's async support makes it efficient for billing workloads where webhook processing and usage metering involve I/O-bound operations. Pydantic models validate every request, SQLAlchemy's async session handles database operations without blocking, and the dependency injection system makes feature gating reusable across endpoints.

The idempotent webhook handler is the most critical component. By checking the WebhookEvent table before processing and committing the event ID after processing, the handler is safe against duplicate deliveries, Stripe retries, and partial failures. Combined with Stripe's signature verification, this creates a reliable bridge between Stripe's event system and your application state.

For production, add Alembic for database migrations, pytest-asyncio for async test coverage, and a background task runner (Celery or ARQ) for operations that should not block the webhook response — dunning emails, usage aggregation reports, and invoice PDF generation.

FAQ

Need expert help?

Building with saas engineering?

I help teams ship production-grade systems. From architecture review to hands-on builds.

Muneer Puthiya Purayil

SaaS Architect & AI Systems Engineer. 10+ years shipping production infrastructure across fintech, automotive, e-commerce, and healthcare.

Engage

Start a
Conversation.

For teams building at scale: SaaS platforms, agentic AI systems, and enterprise mobile infrastructure. Scope and fit are evaluated before any engagement begins.

Limited availability · Q3 / Q4 2026