Back to Journal
SaaS Engineering

Complete Guide to Multi-Tenant Architecture with Python

A comprehensive guide to implementing Multi-Tenant Architecture using Python, covering architecture, code examples, and production-ready patterns.

Muneer Puthiya Purayil 19 min read

Multi-tenant architecture is the foundation of every modern SaaS platform. Whether you're building a project management tool or an enterprise analytics suite, the decision of how to isolate tenant data shapes your entire system's cost structure, security posture, and operational complexity. This guide walks through implementing multi-tenant architecture in Python, from foundational patterns to production-hardened code.

Understanding Multi-Tenancy Models

There are three primary tenancy models, each with distinct trade-offs:

Database-per-tenant gives each tenant a completely separate database. This provides the strongest isolation but increases operational overhead linearly with tenant count.

Schema-per-tenant uses a shared database but separate schemas per tenant. This balances isolation with operational simplicity, particularly on PostgreSQL.

Shared-schema (row-level) stores all tenants in the same tables, discriminated by a tenant_id column. This is the most operationally efficient model but requires rigorous discipline to prevent data leakage.

In practice, most Python SaaS applications at scale use shared-schema with row-level security, supplemented by dedicated databases for enterprise tenants who require it.

Setting Up the Tenant Context

The first building block is a reliable way to propagate tenant identity through every layer of your application. Python's contextvars module (available since 3.7) is purpose-built for this:

python
1import contextvars
2from dataclasses import dataclass
3from typing import Optional
4 
5tenant_context: contextvars.ContextVar[Optional['TenantInfo']] = contextvars.ContextVar(
6 'tenant_context', default=None
7)
8 
9@dataclass(frozen=True)
10class TenantInfo:
11 tenant_id: str
12 plan: str
13 db_schema: str
14 is_isolated: bool = False
15 
16def get_current_tenant() -> TenantInfo:
17 tenant = tenant_context.get()
18 if tenant is None:
19 raise RuntimeError("No tenant context set. Did you forget the middleware?")
20 return tenant
21 
22def set_current_tenant(tenant: TenantInfo) -> contextvars.Token:
23 return tenant_context.set(tenant)
24 

Using contextvars over thread-local storage is critical when running under async frameworks like FastAPI or Starlette, since a single thread may handle multiple concurrent requests.

FastAPI Middleware for Tenant Resolution

With the context module in place, wire it into your HTTP layer:

python
1from fastapi import FastAPI, Request, HTTPException
2from starlette.middleware.base import BaseHTTPMiddleware
3 
4app = FastAPI()
5 
6class TenantMiddleware(BaseHTTPMiddleware):
7 def __init__(self, app, tenant_repo):
8 super().__init__(app)
9 self.tenant_repo = tenant_repo
10 
11 async def dispatch(self, request: Request, call_next):
12 tenant_header = request.headers.get("X-Tenant-ID")
13 tenant_subdomain = self._extract_subdomain(request.url.hostname)
14 
15 tenant_identifier = tenant_header or tenant_subdomain
16 if not tenant_identifier:
17 raise HTTPException(status_code=400, detail="Tenant identification required")
18 
19 tenant = await self.tenant_repo.get_tenant(tenant_identifier)
20 if not tenant:
21 raise HTTPException(status_code=404, detail="Tenant not found")
22 
23 if not tenant.is_active:
24 raise HTTPException(status_code=403, detail="Tenant account suspended")
25 
26 token = set_current_tenant(TenantInfo(
27 tenant_id=tenant.id,
28 plan=tenant.plan,
29 db_schema=tenant.schema_name,
30 is_isolated=tenant.has_dedicated_db,
31 ))
32 
33 try:
34 response = await call_next(request)
35 return response
36 finally:
37 tenant_context.reset(token)
38 
39 @staticmethod
40 def _extract_subdomain(hostname: str | None) -> str | None:
41 if not hostname:
42 return None
43 parts = hostname.split(".")
44 if len(parts) >= 3:
45 return parts[0]
46 return None
47 

This middleware resolves tenants by header or subdomain, then sets the context for the duration of the request. The finally block ensures context cleanup even if the handler raises an exception.

Row-Level Security with SQLAlchemy

For the shared-schema approach, every query must be scoped to the current tenant. SQLAlchemy's event system lets you enforce this at the ORM level:

python
1from sqlalchemy import event, Column, String, create_engine
2from sqlalchemy.orm import DeclarativeBase, Session, sessionmaker
3 
4class Base(DeclarativeBase):
5 pass
6 
7class TenantMixin:
8 tenant_id = Column(String(64), nullable=False, index=True)
9 
10class Project(TenantMixin, Base):
11 __tablename__ = "projects"
12 id = Column(String(36), primary_key=True)
13 name = Column(String(255), nullable=False)
14 
15@event.listens_for(Session, "before_flush")
16def inject_tenant_id(session, flush_context, instances):
17 tenant = get_current_tenant()
18 for obj in session.new:
19 if isinstance(obj, TenantMixin) and not obj.tenant_id:
20 obj.tenant_id = tenant.tenant_id
21 
22@event.listens_for(Session, "do_orm_execute")
23def filter_by_tenant(execute_state):
24 if execute_state.is_select:
25 tenant = tenant_context.get()
26 if tenant is not None:
27 execute_state.statement = execute_state.statement.filter_by(
28 tenant_id=tenant.tenant_id
29 )
30 

This approach has a significant advantage: developers don't need to remember to filter by tenant_id in every query. The ORM enforces it automatically.

However, this ORM-level filtering is a safety net, not a replacement for database-level enforcement. Always pair it with PostgreSQL row-level security (RLS) policies.

PostgreSQL Row-Level Security Policies

RLS provides a database-level guarantee that queries cannot cross tenant boundaries, even if your application code has a bug:

python
1from sqlalchemy import text
2 
3def setup_rls(engine, table_name: str):
4 with engine.begin() as conn:
5 conn.execute(text(f"ALTER TABLE {table_name} ENABLE ROW LEVEL SECURITY"))
6 conn.execute(text(f"ALTER TABLE {table_name} FORCE ROW LEVEL SECURITY"))
7 conn.execute(text(f"""
8 CREATE POLICY tenant_isolation ON {table_name}
9 USING (tenant_id = current_setting('app.current_tenant_id'))
10 WITH CHECK (tenant_id = current_setting('app.current_tenant_id'))
11 """))
12 
13def set_tenant_for_connection(conn, tenant_id: str):
14 conn.execute(text("SET app.current_tenant_id = :tid"), {"tid": tenant_id})
15 

To integrate this with SQLAlchemy sessions:

python
1from sqlalchemy import event
2 
3@event.listens_for(engine, "checkout")
4def set_tenant_on_checkout(dbapi_conn, connection_record, connection_proxy):
5 tenant = tenant_context.get()
6 if tenant:
7 cursor = dbapi_conn.cursor()
8 cursor.execute("SET app.current_tenant_id = %s", (tenant.tenant_id,))
9 cursor.close()
10 
11@event.listens_for(engine, "checkin")
12def reset_tenant_on_checkin(dbapi_conn, connection_record):
13 cursor = dbapi_conn.cursor()
14 cursor.execute("RESET app.current_tenant_id")
15 cursor.close()
16 

With this setup, even raw SQL queries executed against the connection are tenant-scoped at the database level.

Schema-Per-Tenant Implementation

For tenants that require stronger isolation (often enterprise customers on premium plans), schema-per-tenant is a pragmatic middle ground:

python
1from sqlalchemy import text
2 
3class SchemaManager:
4 def __init__(self, engine, base_schema: str = "tenant_template"):
5 self.engine = engine
6 self.base_schema = base_schema
7 
8 async def provision_schema(self, tenant_id: str) -> str:
9 schema_name = f"tenant_{tenant_id.replace('-', '_')}"
10 with self.engine.begin() as conn:
11 conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}"))
12 tables = conn.execute(text(
13 "SELECT table_name FROM information_schema.tables "
14 "WHERE table_schema = :schema"
15 ), {"schema": self.base_schema}).fetchall()
16 
17 for (table_name,) in tables:
18 conn.execute(text(
19 f"CREATE TABLE {schema_name}.{table_name} "
20 f"(LIKE {self.base_schema}.{table_name} INCLUDING ALL)"
21 ))
22 return schema_name
23 
24 async def drop_schema(self, tenant_id: str):
25 schema_name = f"tenant_{tenant_id.replace('-', '_')}"
26 with self.engine.begin() as conn:
27 conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE"))
28 
29def set_search_path_for_tenant(session, tenant):
30 session.execute(text(f"SET search_path TO {tenant.db_schema}, public"))
31 

Need a second opinion on your saas engineering architecture?

I run free 30-minute strategy calls for engineering teams tackling this exact problem.

Book a Free Call

Database Routing for Isolated Tenants

Some enterprise tenants need their own database entirely. Implement a connection router:

python
1from sqlalchemy import create_engine
2from sqlalchemy.orm import sessionmaker, Session
3from functools import lru_cache
4 
5class DatabaseRouter:
6 def __init__(self, default_url: str, tenant_db_map: dict[str, str]):
7 self._default_url = default_url
8 self._tenant_db_map = tenant_db_map
9 
10 @lru_cache(maxsize=128)
11 def _get_engine(self, db_url: str):
12 return create_engine(
13 db_url,
14 pool_size=10,
15 max_overflow=5,
16 pool_pre_ping=True,
17 pool_recycle=3600,
18 )
19 
20 def get_session(self) -> Session:
21 tenant = get_current_tenant()
22 if tenant.is_isolated and tenant.tenant_id in self._tenant_db_map:
23 db_url = self._tenant_db_map[tenant.tenant_id]
24 else:
25 db_url = self._default_url
26 engine = self._get_engine(db_url)
27 return sessionmaker(bind=engine)()
28 
29router = DatabaseRouter(
30 default_url="postgresql://user:pass@shared-db:5432/saas",
31 tenant_db_map={
32 "enterprise-acme": "postgresql://user:pass@acme-db:5432/acme",
33 "enterprise-globex": "postgresql://user:pass@globex-db:5432/globex",
34 }
35)
36 
37def get_db():
38 session = router.get_session()
39 try:
40 yield session
41 finally:
42 session.close()
43 

Tenant-Aware Caching

Caching in a multi-tenant system requires careful key namespacing to prevent data leakage:

python
1import redis
2import json
3import hashlib
4from functools import wraps
5 
6class TenantCache:
7 def __init__(self, redis_client: redis.Redis, default_ttl: int = 300):
8 self.redis = redis_client
9 self.default_ttl = default_ttl
10 
11 def _build_key(self, namespace: str, key: str) -> str:
12 tenant = get_current_tenant()
13 return f"t:{tenant.tenant_id}:{namespace}:{key}"
14 
15 async def get(self, namespace: str, key: str):
16 cache_key = self._build_key(namespace, key)
17 value = self.redis.get(cache_key)
18 if value:
19 return json.loads(value)
20 return None
21 
22 async def set(self, namespace: str, key: str, value, ttl: int | None = None):
23 cache_key = self._build_key(namespace, key)
24 self.redis.setex(
25 cache_key,
26 ttl or self.default_ttl,
27 json.dumps(value),
28 )
29 
30 async def invalidate_tenant(self, tenant_id: str):
31 pattern = f"t:{tenant_id}:*"
32 cursor = 0
33 while True:
34 cursor, keys = self.redis.scan(cursor, match=pattern, count=100)
35 if keys:
36 self.redis.delete(*keys)
37 if cursor == 0:
38 break
39 

Rate Limiting Per Tenant

Different tenant plans need different rate limits. Here is a token-bucket implementation backed by Redis:

python
1import time
2import redis
3 
4class TenantRateLimiter:
5 PLAN_LIMITS = {
6 "free": {"requests_per_minute": 60, "burst": 10},
7 "pro": {"requests_per_minute": 600, "burst": 50},
8 "enterprise": {"requests_per_minute": 6000, "burst": 200},
9 }
10 
11 def __init__(self, redis_client: redis.Redis):
12 self.redis = redis_client
13 
14 def check_rate_limit(self, tenant, endpoint: str) -> tuple[bool, dict]:
15 limits = self.PLAN_LIMITS.get(tenant.plan, self.PLAN_LIMITS["free"])
16 key = f"rl:{tenant.tenant_id}:{endpoint}"
17 now = time.time()
18 window = 60
19 
20 pipe = self.redis.pipeline()
21 pipe.zremrangebyscore(key, 0, now - window)
22 pipe.zadd(key, {f"{now}": now})
23 pipe.zcard(key)
24 pipe.expire(key, window)
25 _, _, count, _ = pipe.execute()
26 
27 allowed = count <= limits["requests_per_minute"]
28 headers = {
29 "X-RateLimit-Limit": str(limits["requests_per_minute"]),
30 "X-RateLimit-Remaining": str(max(0, limits["requests_per_minute"] - count)),
31 "X-RateLimit-Reset": str(int(now + window)),
32 }
33 return allowed, headers
34 

Alembic Migrations for Multi-Tenant Schemas

Running migrations across potentially hundreds of tenant schemas requires automation:

python
1from alembic import context
2from sqlalchemy import create_engine, text
3 
4def get_tenant_schemas(engine):
5 with engine.connect() as conn:
6 result = conn.execute(text(
7 "SELECT schema_name FROM information_schema.schemata "
8 "WHERE schema_name LIKE 'tenant_%'"
9 ))
10 return [row[0] for row in result]
11 
12def run_migrations_online():
13 engine = create_engine(config.get_main_option("sqlalchemy.url"))
14 
15 if context.get_x_argument(as_dictionary=True).get("tenant_mode") == "true":
16 schemas = get_tenant_schemas(engine)
17 for schema in schemas:
18 with engine.connect() as connection:
19 connection.execute(text(f"SET search_path TO {schema}"))
20 context.configure(
21 connection=connection,
22 target_metadata=target_metadata,
23 version_table_schema=schema,
24 )
25 with context.begin_transaction():
26 context.run_migrations()
27 else:
28 with engine.connect() as connection:
29 context.configure(
30 connection=connection,
31 target_metadata=target_metadata,
32 )
33 with context.begin_transaction():
34 context.run_migrations()
35 

Run tenant migrations with: alembic -x tenant_mode=true upgrade head

Testing Multi-Tenant Isolation

Testing is where many multi-tenant systems fail. Write explicit cross-tenant isolation tests:

python
1import pytest
2 
3@pytest.fixture
4def tenant_a():
5 return TenantInfo(tenant_id="tenant-a", plan="pro", db_schema="public")
6 
7@pytest.fixture
8def tenant_b():
9 return TenantInfo(tenant_id="tenant-b", plan="pro", db_schema="public")
10 
11class TestTenantIsolation:
12 def test_tenant_a_cannot_see_tenant_b_data(self, db_session, tenant_a, tenant_b):
13 set_current_tenant(tenant_a)
14 project_a = Project(id="proj-1", name="Tenant A Project")
15 db_session.add(project_a)
16 db_session.commit()
17 
18 set_current_tenant(tenant_b)
19 projects = db_session.query(Project).all()
20 assert len(projects) == 0
21 
22 def test_tenant_b_cannot_modify_tenant_a_data(self, db_session, tenant_a, tenant_b):
23 set_current_tenant(tenant_a)
24 project = Project(id="proj-1", name="Original Name")
25 db_session.add(project)
26 db_session.commit()
27 
28 set_current_tenant(tenant_b)
29 result = db_session.query(Project).filter_by(id="proj-1").first()
30 assert result is None
31 
32 def test_missing_tenant_context_raises(self, db_session):
33 with pytest.raises(RuntimeError, match="No tenant context set"):
34 get_current_tenant()
35 
36 def test_cache_isolation(self, tenant_cache, tenant_a, tenant_b):
37 set_current_tenant(tenant_a)
38 tenant_cache.set("projects", "list", [{"id": "proj-a"}])
39 
40 set_current_tenant(tenant_b)
41 result = tenant_cache.get("projects", "list")
42 assert result is None
43 

Production Monitoring and Metrics

Instrument your multi-tenant system to detect isolation failures and performance outliers:

python
1from prometheus_client import Counter, Histogram, Gauge
2 
3tenant_request_total = Counter(
4 "tenant_requests_total",
5 "Total requests per tenant",
6 ["tenant_id", "plan", "endpoint"]
7)
8 
9tenant_request_duration = Histogram(
10 "tenant_request_duration_seconds",
11 "Request duration per tenant",
12 ["tenant_id", "plan"],
13 buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
14)
15 
16tenant_db_query_count = Counter(
17 "tenant_db_queries_total",
18 "Database queries per tenant",
19 ["tenant_id", "operation"]
20)
21 
22active_tenants_gauge = Gauge(
23 "active_tenants",
24 "Number of currently active tenants"
25)
26 

Track noisy neighbor effects by monitoring per-tenant query counts and latency. When a single tenant accounts for more than 40% of your database load, it is time to consider migrating them to an isolated database.

Conclusion

Building multi-tenant architecture in Python demands layered defenses. No single mechanism — whether ORM filters, RLS policies, or schema isolation — is sufficient on its own. The most resilient systems combine application-level tenant context propagation with database-level RLS enforcement, then verify correctness through comprehensive isolation tests.

The contextvars-based approach shown here scales well with both sync and async Python frameworks. As your tenant base grows, the patterns for database routing and schema management let you graduate individual tenants to stronger isolation tiers without rearchitecting the core system. Start with shared-schema RLS for most tenants, and reserve schema-per-tenant or database-per-tenant for those with contractual isolation requirements or extreme load profiles.

FAQ

Need expert help?

Building with saas engineering?

I help teams ship production-grade systems. From architecture review to hands-on builds.

Muneer Puthiya Purayil

SaaS Architect & AI Systems Engineer. 10+ years shipping production infrastructure across fintech, automotive, e-commerce, and healthcare.

Engage

Start a
Conversation.

For teams building at scale: SaaS platforms, agentic AI systems, and enterprise mobile infrastructure. Scope and fit are evaluated before any engagement begins.

Limited availability · Q3 / Q4 2026