Back to Journal
AI Architecture

Complete Guide to Vector Database Architecture with Python

A comprehensive guide to implementing Vector Database Architecture using Python, covering architecture, code examples, and production-ready patterns.

Muneer Puthiya Purayil 18 min read

Python dominates the AI/ML ecosystem, and vector database integration is no exception. Whether you're building RAG pipelines, recommendation engines, or semantic search, Python offers the richest client libraries, embedding model access, and data processing tools. This guide covers building production vector search systems in Python — from embedding generation through query serving — with patterns that scale beyond prototyping.

Choosing Your Vector Database Client

Python has first-class clients for every major vector database:

python
1# Qdrant — best for filtered search and self-hosted deployments
2from qdrant_client import QdrantClient
3client = QdrantClient(url="http://localhost:6333")
4 
5# Pinecone — best managed experience, least operational overhead
6from pinecone import Pinecone
7pc = Pinecone(api_key="your-key")
8index = pc.Index("my-index")
9 
10# Weaviate — best for hybrid search (vector + BM25)
11import weaviate
12client = weaviate.connect_to_local()
13 
14# ChromaDB — best for local development and prototyping
15import chromadb
16client = chromadb.PersistentClient(path="./chroma_data")
17 
18# pgvector — best when you already run PostgreSQL
19import psycopg2
20from pgvector.psycopg2 import register_vector
21 

For production systems, the decision tree is straightforward:

  1. Already running PostgreSQL and <5M vectors? pgvector
  2. Need managed with minimal ops? Pinecone
  3. Need hybrid search (BM25 + vector)? Weaviate
  4. Need fine-grained filtering and self-hosting? Qdrant
  5. Prototyping or small-scale? ChromaDB

Embedding Pipeline Architecture

Production Embedding Service

python
1from dataclasses import dataclass, field
2from typing import Protocol
3import asyncio
4import hashlib
5import time
6import numpy as np
7from openai import AsyncOpenAI
8 
9class EmbeddingModel(Protocol):
10 async def embed(self, texts: list[str]) -> list[list[float]]: ...
11 
12@dataclass
13class OpenAIEmbedder:
14 model: str = "text-embedding-3-small"
15 dimensions: int = 1536
16 _client: AsyncOpenAI = field(default_factory=AsyncOpenAI)
17 
18 async def embed(self, texts: list[str]) -> list[list[float]]:
19 response = await self._client.embeddings.create(
20 input=texts,
21 model=self.model,
22 dimensions=self.dimensions,
23 )
24 return [item.embedding for item in response.data]
25 
26@dataclass
27class EmbeddingPipeline:
28 embedder: EmbeddingModel
29 batch_size: int = 100
30 max_concurrent: int = 5
31 cache: dict[str, list[float]] = field(default_factory=dict)
32 
33 def _cache_key(self, text: str) -> str:
34 return hashlib.sha256(text.encode()).hexdigest()[:16]
35 
36 async def embed_documents(
37 self,
38 documents: list[dict],
39 text_field: str = "content",
40 ) -> list[dict]:
41 """Embed documents, using cache where possible."""
42 uncached = []
43 uncached_indices = []
44 results = [None] * len(documents)
45 
46 for i, doc in enumerate(documents):
47 key = self._cache_key(doc[text_field])
48 if key in self.cache:
49 results[i] = {**doc, "embedding": self.cache[key]}
50 else:
51 uncached.append(doc[text_field])
52 uncached_indices.append(i)
53 
54 # Batch embed uncached texts
55 semaphore = asyncio.Semaphore(self.max_concurrent)
56 
57 async def embed_batch(texts: list[str]) -> list[list[float]]:
58 async with semaphore:
59 return await self.embedder.embed(texts)
60 
61 tasks = []
62 for i in range(0, len(uncached), self.batch_size):
63 batch = uncached[i : i + self.batch_size]
64 tasks.append(embed_batch(batch))
65 
66 batch_results = await asyncio.gather(*tasks)
67 embeddings = [e for batch in batch_results for e in batch]
68 
69 for idx, embedding in zip(uncached_indices, embeddings):
70 doc = documents[idx]
71 key = self._cache_key(doc[text_field])
72 self.cache[key] = embedding
73 results[idx] = {**doc, "embedding": embedding}
74 
75 return results
76 

Text Chunking for RAG

python
1from dataclasses import dataclass
2import re
3 
4@dataclass
5class Chunk:
6 text: str
7 metadata: dict
8 index: int
9 doc_id: str
10 
11def chunk_document(
12 doc_id: str,
13 text: str,
14 metadata: dict,
15 max_tokens: int = 500,
16 overlap_tokens: int = 50,
17) -> list[Chunk]:
18 """
19 Split text into chunks at sentence boundaries
20 with configurable overlap.
21 """
22 sentences = re.split(r'(?<=[.!?])\s+', text)
23 chunks = []
24 current_sentences: list[str] = []
25 current_length = 0
26 
27 for sentence in sentences:
28 token_estimate = len(sentence.split())
29 
30 if current_length + token_estimate > max_tokens and current_sentences:
31 chunk_text = " ".join(current_sentences)
32 chunks.append(Chunk(
33 text=chunk_text,
34 metadata={**metadata, "chunk_index": len(chunks)},
35 index=len(chunks),
36 doc_id=doc_id,
37 ))
38 
39 # Compute overlap
40 overlap_sentences = []
41 overlap_length = 0
42 for s in reversed(current_sentences):
43 s_tokens = len(s.split())
44 if overlap_length + s_tokens > overlap_tokens:
45 break
46 overlap_sentences.insert(0, s)
47 overlap_length += s_tokens
48 
49 current_sentences = overlap_sentences
50 current_length = overlap_length
51 
52 current_sentences.append(sentence)
53 current_length += token_estimate
54 
55 if current_sentences:
56 chunks.append(Chunk(
57 text=" ".join(current_sentences),
58 metadata={**metadata, "chunk_index": len(chunks)},
59 index=len(chunks),
60 doc_id=doc_id,
61 ))
62 
63 return chunks
64 

Vector Search with Qdrant

Collection Setup and Indexing

python
1from qdrant_client import QdrantClient, models
2from uuid import uuid4
3 
4client = QdrantClient(url="http://localhost:6333")
5 
6def create_collection(
7 name: str,
8 dimensions: int = 1536,
9 distance: models.Distance = models.Distance.COSINE,
10 quantization: bool = True,
11):
12 """Create an optimized collection for production use."""
13 vectors_config = models.VectorParams(
14 size=dimensions,
15 distance=distance,
16 on_disk=True, # Keep full vectors on SSD
17 )
18 
19 quantization_config = None
20 if quantization:
21 quantization_config = models.ScalarQuantization(
22 scalar=models.ScalarQuantizationConfig(
23 type=models.ScalarType.INT8,
24 quantile=0.99,
25 always_ram=True, # Quantized vectors in RAM
26 ),
27 )
28 
29 client.create_collection(
30 collection_name=name,
31 vectors_config=vectors_config,
32 quantization_config=quantization_config,
33 optimizers_config=models.OptimizersConfigDiff(
34 indexing_threshold=20000,
35 ),
36 hnsw_config=models.HnswConfigDiff(
37 m=16,
38 ef_construct=128,
39 ),
40 )
41 
42 # Create payload indexes for filtered search
43 client.create_payload_index(
44 collection_name=name,
45 field_name="tenant_id",
46 field_schema=models.PayloadSchemaType.KEYWORD,
47 )
48 client.create_payload_index(
49 collection_name=name,
50 field_name="doc_type",
51 field_schema=models.PayloadSchemaType.KEYWORD,
52 )
53 client.create_payload_index(
54 collection_name=name,
55 field_name="created_at",
56 field_schema=models.PayloadSchemaType.DATETIME,
57 )
58 
python
1async def upsert_chunks(
2 collection: str,
3 chunks: list[Chunk],
4 embeddings: list[list[float]],
5 batch_size: int = 100,
6):
7 """Upsert document chunks with their embeddings."""
8 points = []
9 for chunk, embedding in zip(chunks, embeddings):
10 points.append(models.PointStruct(
11 id=str(uuid4()),
12 vector=embedding,
13 payload={
14 "text": chunk.text,
15 "doc_id": chunk.doc_id,
16 "chunk_index": chunk.index,
17 **chunk.metadata,
18 },
19 ))
20 
21 # Batch upsert
22 for i in range(0, len(points), batch_size):
23 batch = points[i : i + batch_size]
24 client.upsert(
25 collection_name=collection,
26 points=batch,
27 )
28 
29def search(
30 collection: str,
31 query_embedding: list[float],
32 tenant_id: str,
33 top_k: int = 10,
34 score_threshold: float = 0.7,
35 doc_type: str | None = None,
36) -> list[dict]:
37 """Search with tenant isolation and optional filters."""
38 must_conditions = [
39 models.FieldCondition(
40 key="tenant_id",
41 match=models.MatchValue(value=tenant_id),
42 ),
43 ]
44 
45 if doc_type:
46 must_conditions.append(
47 models.FieldCondition(
48 key="doc_type",
49 match=models.MatchValue(value=doc_type),
50 )
51 )
52 
53 results = client.search(
54 collection_name=collection,
55 query_vector=query_embedding,
56 query_filter=models.Filter(must=must_conditions),
57 limit=top_k,
58 score_threshold=score_threshold,
59 search_params=models.SearchParams(
60 quantization=models.QuantizationSearchParams(
61 rescore=True,
62 oversampling=2.0,
63 ),
64 ),
65 )
66 
67 return [
68 {
69 "id": str(r.id),
70 "score": r.score,
71 "text": r.payload["text"],
72 "doc_id": r.payload["doc_id"],
73 "chunk_index": r.payload["chunk_index"],
74 }
75 for r in results
76 ]
77 

Need a second opinion on your AI systems architecture?

I run free 30-minute strategy calls for engineering teams tackling this exact problem.

Book a Free Call

Vector Search with pgvector

For teams already running PostgreSQL:

python
1import psycopg2
2from pgvector.psycopg2 import register_vector
3import numpy as np
4 
5def get_connection():
6 conn = psycopg2.connect(
7 host="localhost",
8 database="vectordb",
9 user="app",
10 password="secret",
11 )
12 register_vector(conn)
13 return conn
14 
15def setup_schema(conn):
16 with conn.cursor() as cur:
17 cur.execute("CREATE EXTENSION IF NOT EXISTS vector")
18 cur.execute("""
19 CREATE TABLE IF NOT EXISTS documents (
20 id TEXT PRIMARY KEY,
21 tenant_id TEXT NOT NULL,
22 doc_type TEXT,
23 content TEXT NOT NULL,
24 embedding vector(1536),
25 metadata JSONB DEFAULT '{}',
26 created_at TIMESTAMPTZ DEFAULT NOW()
27 )
28 """)
29 cur.execute("""
30 CREATE INDEX IF NOT EXISTS documents_embedding_idx
31 ON documents USING hnsw (embedding vector_cosine_ops)
32 WITH (m = 16, ef_construction = 64)
33 """)
34 cur.execute("""
35 CREATE INDEX IF NOT EXISTS documents_tenant_idx
36 ON documents (tenant_id)
37 """)
38 conn.commit()
39 
40def upsert_document(
41 conn,
42 doc_id: str,
43 tenant_id: str,
44 content: str,
45 embedding: list[float],
46 doc_type: str = "document",
47 metadata: dict | None = None,
48):
49 vec = np.array(embedding, dtype=np.float32)
50 with conn.cursor() as cur:
51 cur.execute("""
52 INSERT INTO documents (id, tenant_id, doc_type, content, embedding, metadata)
53 VALUES (%s, %s, %s, %s, %s, %s)
54 ON CONFLICT (id) DO UPDATE SET
55 content = EXCLUDED.content,
56 embedding = EXCLUDED.embedding,
57 metadata = EXCLUDED.metadata
58 """, (doc_id, tenant_id, doc_type, content, vec, json.dumps(metadata or {})))
59 conn.commit()
60 
61def search_documents(
62 conn,
63 query_embedding: list[float],
64 tenant_id: str,
65 top_k: int = 10,
66 threshold: float = 0.7,
67) -> list[dict]:
68 vec = np.array(query_embedding, dtype=np.float32)
69 with conn.cursor() as cur:
70 cur.execute("""
71 SELECT
72 id, content, doc_type, metadata,
73 1 - (embedding <=> %s) AS similarity
74 FROM documents
75 WHERE tenant_id = %s
76 AND 1 - (embedding <=> %s) > %s
77 ORDER BY embedding <=> %s
78 LIMIT %s
79 """, (vec, tenant_id, vec, threshold, vec, top_k))
80 
81 results = []
82 for row in cur.fetchall():
83 results.append({
84 "id": row[0],
85 "content": row[1],
86 "doc_type": row[2],
87 "metadata": row[3],
88 "similarity": float(row[4]),
89 })
90 return results
91 

RAG Pipeline with Streaming

Build a complete RAG pipeline with streaming responses:

python
1from openai import AsyncOpenAI
2from dataclasses import dataclass
3from typing import AsyncIterator
4 
5@dataclass
6class RAGPipeline:
7 embedder: EmbeddingModel
8 search_fn: callable
9 llm_client: AsyncOpenAI
10 model: str = "gpt-4o-mini"
11 top_k: int = 5
12 
13 async def query(
14 self,
15 question: str,
16 tenant_id: str,
17 system_prompt: str | None = None,
18 ) -> dict:
19 """Non-streaming RAG query."""
20 context_chunks = await self._retrieve(question, tenant_id)
21 context = self._format_context(context_chunks)
22 
23 completion = await self.llm_client.chat.completions.create(
24 model=self.model,
25 messages=self._build_messages(question, context, system_prompt),
26 temperature=0.1,
27 )
28 
29 return {
30 "answer": completion.choices[0].message.content,
31 "sources": context_chunks,
32 "model": self.model,
33 "usage": {
34 "prompt_tokens": completion.usage.prompt_tokens,
35 "completion_tokens": completion.usage.completion_tokens,
36 },
37 }
38 
39 async def query_stream(
40 self,
41 question: str,
42 tenant_id: str,
43 system_prompt: str | None = None,
44 ) -> AsyncIterator[str]:
45 """Streaming RAG query."""
46 context_chunks = await self._retrieve(question, tenant_id)
47 context = self._format_context(context_chunks)
48 
49 stream = await self.llm_client.chat.completions.create(
50 model=self.model,
51 messages=self._build_messages(question, context, system_prompt),
52 temperature=0.1,
53 stream=True,
54 )
55 
56 async for chunk in stream:
57 if chunk.choices[0].delta.content:
58 yield chunk.choices[0].delta.content
59 
60 async def _retrieve(
61 self, question: str, tenant_id: str
62 ) -> list[dict]:
63 embeddings = await self.embedder.embed([question])
64 return self.search_fn(
65 query_embedding=embeddings[0],
66 tenant_id=tenant_id,
67 top_k=self.top_k,
68 )
69 
70 def _format_context(self, chunks: list[dict]) -> str:
71 parts = []
72 for i, chunk in enumerate(chunks):
73 parts.append(f"[Source {i + 1}]\n{chunk['text']}")
74 return "\n\n---\n\n".join(parts)
75 
76 def _build_messages(
77 self,
78 question: str,
79 context: str,
80 system_prompt: str | None,
81 ) -> list[dict]:
82 default_system = (
83 "Answer the question based on the provided context. "
84 "Cite sources using [Source N] notation. "
85 "If the context doesn't contain relevant information, say so."
86 )
87 return [
88 {"role": "system", "content": system_prompt or default_system},
89 {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"},
90 ]
91 

FastAPI Search Service

Serve your vector search through a production API:

python
1from fastapi import FastAPI, HTTPException, Depends
2from pydantic import BaseModel
3from fastapi.responses import StreamingResponse
4import json
5 
6app = FastAPI(title="Vector Search API")
7 
8class SearchRequest(BaseModel):
9 query: str
10 tenant_id: str
11 top_k: int = 10
12 doc_type: str | None = None
13 threshold: float = 0.7
14 
15class RAGRequest(BaseModel):
16 question: str
17 tenant_id: str
18 system_prompt: str | None = None
19 stream: bool = False
20 
21@app.post("/search")
22async def search_endpoint(req: SearchRequest):
23 embeddings = await embedder.embed([req.query])
24 
25 results = search(
26 collection="documents",
27 query_embedding=embeddings[0],
28 tenant_id=req.tenant_id,
29 top_k=req.top_k,
30 score_threshold=req.threshold,
31 doc_type=req.doc_type,
32 )
33 
34 return {"results": results, "count": len(results)}
35 
36@app.post("/rag")
37async def rag_endpoint(req: RAGRequest):
38 pipeline = RAGPipeline(
39 embedder=embedder,
40 search_fn=lambda **kwargs: search(
41 collection="documents", **kwargs
42 ),
43 llm_client=AsyncOpenAI(),
44 )
45 
46 if req.stream:
47 async def generate():
48 async for token in pipeline.query_stream(
49 req.question,
50 req.tenant_id,
51 req.system_prompt,
52 ):
53 yield f"data: {json.dumps({'token': token})}\n\n"
54 yield "data: [DONE]\n\n"
55 
56 return StreamingResponse(
57 generate(),
58 media_type="text/event-stream",
59 )
60 
61 result = await pipeline.query(
62 req.question,
63 req.tenant_id,
64 req.system_prompt,
65 )
66 return result
67 

Evaluation and Testing

Measure your vector search quality systematically:

python
1from dataclasses import dataclass
2import numpy as np
3 
4@dataclass
5class EvalResult:
6 recall_at_k: float
7 mrr: float
8 avg_similarity: float
9 queries_evaluated: int
10 
11def evaluate_search(
12 search_fn: callable,
13 test_set: list[dict],
14 k: int = 10,
15) -> EvalResult:
16 """
17 Evaluate search quality against a ground truth test set.
18 
19 test_set format:
20 [
21 {
22 "query": "What is...",
23 "expected_doc_ids": ["doc_1", "doc_3"],
24 "tenant_id": "test",
25 },
26 ]
27 """
28 recalls = []
29 reciprocal_ranks = []
30 similarities = []
31 
32 for test_case in test_set:
33 embedding = embedder.embed_sync([test_case["query"]])[0]
34 results = search_fn(
35 query_embedding=embedding,
36 tenant_id=test_case["tenant_id"],
37 top_k=k,
38 )
39 
40 result_ids = [r["doc_id"] for r in results]
41 expected = set(test_case["expected_doc_ids"])
42 
43 # Recall@K
44 found = len(expected.intersection(result_ids))
45 recalls.append(found / len(expected))
46 
47 # Mean Reciprocal Rank
48 for rank, doc_id in enumerate(result_ids, 1):
49 if doc_id in expected:
50 reciprocal_ranks.append(1.0 / rank)
51 break
52 else:
53 reciprocal_ranks.append(0.0)
54 
55 # Average similarity score
56 if results:
57 similarities.append(np.mean([r["score"] for r in results]))
58 
59 return EvalResult(
60 recall_at_k=np.mean(recalls),
61 mrr=np.mean(reciprocal_ranks),
62 avg_similarity=np.mean(similarities) if similarities else 0.0,
63 queries_evaluated=len(test_set),
64 )
65 

FAQ

Need expert help?

Building with agentic AI?

I help teams ship production-grade systems. From architecture review to hands-on builds.

Muneer Puthiya Purayil

SaaS Architect & AI Systems Engineer. 10+ years shipping production infrastructure across fintech, automotive, e-commerce, and healthcare.

Engage

Start a
Conversation.

For teams building at scale: SaaS platforms, agentic AI systems, and enterprise mobile infrastructure. Scope and fit are evaluated before any engagement begins.

Limited availability · Q3 / Q4 2026