Back to Journal
AI Architecture

Complete Guide to RAG Pipeline Design with Python

A comprehensive guide to implementing RAG Pipeline Design using Python, covering architecture, code examples, and production-ready patterns.

Muneer Puthiya Purayil 12 min read

RAG (Retrieval-Augmented Generation) pipelines connect your proprietary data to large language models, enabling them to answer questions with up-to-date, domain-specific knowledge. This guide covers building a production-ready RAG pipeline in Python, from document ingestion through retrieval optimization and response generation.

Architecture Overview

A production RAG pipeline consists of five stages:

  1. Document Ingestion: Parse source documents into raw text
  2. Chunking: Split text into semantically meaningful segments
  3. Embedding: Convert chunks into vector representations
  4. Retrieval: Find relevant chunks for a given query
  5. Generation: Produce an answer using retrieved context
python
1from dataclasses import dataclass
2 
3@dataclass
4class RAGConfig:
5 embedding_model: str = "text-embedding-3-small"
6 embedding_dimensions: int = 1536
7 chunk_size: int = 512
8 chunk_overlap: int = 50
9 retrieval_top_k: int = 5
10 generation_model: str = "claude-sonnet-4-5-20250514"
11 max_generation_tokens: int = 1024
12 

Document Parsing

Handle multiple document formats with a unified parser:

python
1import os
2from pathlib import Path
3from abc import ABC, abstractmethod
4 
5class DocumentParser(ABC):
6 @abstractmethod
7 def parse(self, file_path: str) -> str:
8 pass
9 
10class MarkdownParser(DocumentParser):
11 def parse(self, file_path: str) -> str:
12 return Path(file_path).read_text(encoding="utf-8")
13 
14class PDFParser(DocumentParser):
15 def parse(self, file_path: str) -> str:
16 import fitz # PyMuPDF
17 doc = fitz.open(file_path)
18 text = ""
19 for page in doc:
20 text += page.get_text() + "\n"
21 doc.close()
22 return text
23 
24class HTMLParser(DocumentParser):
25 def parse(self, file_path: str) -> str:
26 from bs4 import BeautifulSoup
27 html = Path(file_path).read_text(encoding="utf-8")
28 soup = BeautifulSoup(html, "html.parser")
29 for tag in soup(["script", "style", "nav", "footer"]):
30 tag.decompose()
31 return soup.get_text(separator="\n", strip=True)
32 
33class ParserFactory:
34 _parsers = {
35 ".md": MarkdownParser,
36 ".pdf": PDFParser,
37 ".html": HTMLParser,
38 ".txt": MarkdownParser,
39 }
40 
41 @classmethod
42 def get_parser(cls, file_path: str) -> DocumentParser:
43 ext = os.path.splitext(file_path)[1].lower()
44 parser_class = cls._parsers.get(ext)
45 if not parser_class:
46 raise ValueError(f"Unsupported file format: {ext}")
47 return parser_class()
48 

Chunking Strategies

Recursive Character Splitting

The most reliable general-purpose chunking strategy:

python
1import re
2from dataclasses import dataclass, field
3import uuid
4 
5@dataclass
6class Chunk:
7 id: str
8 text: str
9 metadata: dict = field(default_factory=dict)
10 
11class RecursiveChunker:
12 def __init__(self, max_tokens: int = 512, overlap: int = 50):
13 self.max_tokens = max_tokens
14 self.overlap = overlap
15 self.separators = ["\n\n", "\n", ". ", " "]
16 
17 def chunk(self, text: str, metadata: dict | None = None) -> list[Chunk]:
18 metadata = metadata or {}
19 chunks = self._split(text, self.separators)
20 
21 return [
22 Chunk(
23 id=str(uuid.uuid4()),
24 text=chunk.strip(),
25 metadata={**metadata, "chunk_index": i},
26 )
27 for i, chunk in enumerate(chunks)
28 if chunk.strip()
29 ]
30 
31 def _split(self, text: str, separators: list[str]) -> list[str]:
32 if not separators:
33 return [text]
34 
35 sep = separators[0]
36 remaining_seps = separators[1:]
37 parts = text.split(sep)
38 
39 chunks = []
40 current = ""
41 
42 for part in parts:
43 candidate = current + sep + part if current else part
44 
45 if self._token_count(candidate) > self.max_tokens:
46 if current:
47 chunks.append(current)
48 if self._token_count(part) > self.max_tokens:
49 sub_chunks = self._split(part, remaining_seps)
50 chunks.extend(sub_chunks)
51 current = ""
52 else:
53 current = part
54 else:
55 current = candidate
56 
57 if current:
58 chunks.append(current)
59 
60 # Apply overlap
61 if self.overlap > 0 and len(chunks) > 1:
62 chunks = self._apply_overlap(chunks)
63 
64 return chunks
65 
66 def _apply_overlap(self, chunks: list[str]) -> list[str]:
67 result = [chunks[0]]
68 for chunk in chunks[1:]:
69 prev_words = result[-1].split()
70 overlap_text = " ".join(prev_words[-self.overlap:])
71 result.append(overlap_text + " " + chunk)
72 return result
73 
74 def _token_count(self, text: str) -> int:
75 return len(text.split()) * 4 // 3
76 

Section-Aware Chunking

For structured documents with headers:

python
1class SectionAwareChunker:
2 def __init__(self, max_tokens: int = 512):
3 self.max_tokens = max_tokens
4 self.recursive = RecursiveChunker(max_tokens=max_tokens)
5 
6 def chunk(self, text: str, metadata: dict | None = None) -> list[Chunk]:
7 sections = self._extract_sections(text)
8 chunks = []
9 
10 for section in sections:
11 section_meta = {**(metadata or {}), "section": section["header"]}
12 context_prefix = f"Section: {section['header']}\n\n"
13 
14 if self._token_count(section["content"]) <= self.max_tokens:
15 chunks.append(Chunk(
16 id=str(uuid.uuid4()),
17 text=context_prefix + section["content"],
18 metadata=section_meta,
19 ))
20 else:
21 sub_chunks = self.recursive.chunk(section["content"], section_meta)
22 for sub in sub_chunks:
23 sub.text = context_prefix + sub.text
24 chunks.extend(sub_chunks)
25 
26 return chunks
27 
28 def _extract_sections(self, text: str) -> list[dict]:
29 pattern = r"^(#{1,3})\s+(.+)$"
30 sections = []
31 current = {"header": "Introduction", "content": ""}
32 
33 for line in text.split("\n"):
34 match = re.match(pattern, line)
35 if match:
36 if current["content"].strip():
37 sections.append(current)
38 current = {"header": match.group(2), "content": ""}
39 else:
40 current["content"] += line + "\n"
41 
42 if current["content"].strip():
43 sections.append(current)
44 
45 return sections
46 
47 def _token_count(self, text: str) -> int:
48 return len(text.split()) * 4 // 3
49 

Embedding Pipeline

python
1from openai import OpenAI
2import numpy as np
3 
4class EmbeddingService:
5 def __init__(self, model: str = "text-embedding-3-small"):
6 self.client = OpenAI()
7 self.model = model
8 
9 def embed(self, texts: list[str], batch_size: int = 64) -> list[list[float]]:
10 all_embeddings = []
11 
12 for i in range(0, len(texts), batch_size):
13 batch = texts[i:i + batch_size]
14 response = self.client.embeddings.create(
15 model=self.model,
16 input=batch,
17 )
18 batch_embeddings = [item.embedding for item in response.data]
19 all_embeddings.extend(batch_embeddings)
20 
21 return all_embeddings
22 
23 def embed_query(self, query: str) -> list[float]:
24 return self.embed([query])[0]
25 

Need a second opinion on your AI systems architecture?

I run free 30-minute strategy calls for engineering teams tackling this exact problem.

Book a Free Call

Vector Store Integration

Using Qdrant

python
1from qdrant_client import QdrantClient
2from qdrant_client.models import (
3 Distance, VectorParams, PointStruct,
4 Filter, FieldCondition, MatchValue,
5)
6 
7class QdrantVectorStore:
8 def __init__(self, url: str, api_key: str, collection: str, dimensions: int):
9 self.client = QdrantClient(url=url, api_key=api_key)
10 self.collection = collection
11 self.dimensions = dimensions
12 self._ensure_collection()
13 
14 def _ensure_collection(self):
15 collections = [c.name for c in self.client.get_collections().collections]
16 if self.collection not in collections:
17 self.client.create_collection(
18 collection_name=self.collection,
19 vectors_config=VectorParams(
20 size=self.dimensions,
21 distance=Distance.COSINE,
22 ),
23 )
24 
25 def upsert(self, chunks: list[Chunk], embeddings: list[list[float]]):
26 points = [
27 PointStruct(
28 id=chunk.id,
29 vector=embedding,
30 payload={"text": chunk.text, **chunk.metadata},
31 )
32 for chunk, embedding in zip(chunks, embeddings)
33 ]
34 self.client.upsert(collection_name=self.collection, points=points)
35 
36 def search(self, query_embedding: list[float], top_k: int = 5,
37 filters: dict | None = None) -> list[dict]:
38 query_filter = None
39 if filters:
40 conditions = [
41 FieldCondition(key=k, match=MatchValue(value=v))
42 for k, v in filters.items()
43 ]
44 query_filter = Filter(must=conditions)
45 
46 results = self.client.search(
47 collection_name=self.collection,
48 query_vector=query_embedding,
49 limit=top_k,
50 query_filter=query_filter,
51 )
52 
53 return [
54 {
55 "id": r.id,
56 "text": r.payload["text"],
57 "score": r.score,
58 "metadata": {k: v for k, v in r.payload.items() if k != "text"},
59 }
60 for r in results
61 ]
62 

Using pgvector (PostgreSQL)

python
1import psycopg2
2from pgvector.psycopg2 import register_vector
3 
4class PgVectorStore:
5 def __init__(self, connection_string: str, table: str = "document_chunks"):
6 self.conn = psycopg2.connect(connection_string)
7 register_vector(self.conn)
8 self.table = table
9 self._ensure_table()
10 
11 def _ensure_table(self):
12 with self.conn.cursor() as cur:
13 cur.execute("CREATE EXTENSION IF NOT EXISTS vector")
14 cur.execute(f"""
15 CREATE TABLE IF NOT EXISTS {self.table} (
16 id TEXT PRIMARY KEY,
17 embedding vector(1536),
18 content TEXT NOT NULL,
19 metadata JSONB DEFAULT '{{}}'
20 )
21 """)
22 cur.execute(f"""
23 CREATE INDEX IF NOT EXISTS idx_{self.table}_embedding
24 ON {self.table} USING ivfflat (embedding vector_cosine_ops)
25 WITH (lists = 100)
26 """)
27 self.conn.commit()
28 
29 def upsert(self, chunks: list[Chunk], embeddings: list[list[float]]):
30 with self.conn.cursor() as cur:
31 for chunk, embedding in zip(chunks, embeddings):
32 cur.execute(
33 f"""INSERT INTO {self.table} (id, embedding, content, metadata)
34 VALUES (%s, %s, %s, %s)
35 ON CONFLICT (id) DO UPDATE SET
36 embedding = EXCLUDED.embedding,
37 content = EXCLUDED.content,
38 metadata = EXCLUDED.metadata""",
39 (chunk.id, embedding, chunk.text, json.dumps(chunk.metadata)),
40 )
41 self.conn.commit()
42 
43 def search(self, query_embedding: list[float], top_k: int = 5) -> list[dict]:
44 with self.conn.cursor() as cur:
45 cur.execute(
46 f"""SELECT id, content, metadata,
47 1 - (embedding <=> %s::vector) as score
48 FROM {self.table}
49 ORDER BY embedding <=> %s::vector
50 LIMIT %s""",
51 (query_embedding, query_embedding, top_k),
52 )
53 return [
54 {"id": row[0], "text": row[1], "metadata": row[2], "score": row[3]}
55 for row in cur.fetchall()
56 ]
57 

RAG Query Pipeline

python
1from anthropic import Anthropic
2 
3class RAGPipeline:
4 def __init__(self, config: RAGConfig, embedding_service: EmbeddingService,
5 vector_store, llm_client: Anthropic | None = None):
6 self.config = config
7 self.embedder = embedding_service
8 self.vector_store = vector_store
9 self.llm = llm_client or Anthropic()
10 
11 def query(self, question: str, filters: dict | None = None) -> dict:
12 # Step 1: Embed the query
13 query_embedding = self.embedder.embed_query(question)
14 
15 # Step 2: Retrieve relevant chunks
16 results = self.vector_store.search(
17 query_embedding,
18 top_k=self.config.retrieval_top_k,
19 filters=filters,
20 )
21 
22 if not results:
23 return {
24 "answer": "I could not find relevant information to answer this question.",
25 "sources": [],
26 }
27 
28 # Step 3: Build context
29 context = self._build_context(results)
30 
31 # Step 4: Generate response
32 response = self.llm.messages.create(
33 model=self.config.generation_model,
34 max_tokens=self.config.max_generation_tokens,
35 system=(
36 "You are a helpful assistant. Answer the user's question using only "
37 "the provided context. If the context does not contain enough information "
38 "to fully answer the question, say so explicitly. Always cite your sources "
39 "by referencing the document title or section."
40 ),
41 messages=[
42 {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}
43 ],
44 )
45 
46 return {
47 "answer": response.content[0].text,
48 "sources": [
49 {"text": r["text"][:200], "score": r["score"], **r.get("metadata", {})}
50 for r in results
51 ],
52 "model": self.config.generation_model,
53 "tokens": response.usage.input_tokens + response.usage.output_tokens,
54 }
55 
56 def _build_context(self, results: list[dict]) -> str:
57 parts = []
58 for i, result in enumerate(results, 1):
59 source = result.get("metadata", {}).get("source", "Unknown")
60 section = result.get("metadata", {}).get("section", "")
61 header = f"[Source {i}: {source}"
62 if section:
63 header += f" - {section}"
64 header += "]"
65 parts.append(f"{header}\n{result['text']}")
66 return "\n\n---\n\n".join(parts)
67 

Ingestion Orchestrator

python
1class IngestionOrchestrator:
2 def __init__(self, chunker, embedding_service, vector_store):
3 self.chunker = chunker
4 self.embedder = embedding_service
5 self.vector_store = vector_store
6 
7 def ingest_directory(self, directory: str) -> dict:
8 stats = {"files": 0, "chunks": 0, "errors": []}
9 
10 for root, _, files in os.walk(directory):
11 for filename in files:
12 file_path = os.path.join(root, filename)
13 try:
14 parser = ParserFactory.get_parser(file_path)
15 text = parser.parse(file_path)
16 
17 chunks = self.chunker.chunk(text, metadata={
18 "source": filename,
19 "path": file_path,
20 })
21 
22 embeddings = self.embedder.embed([c.text for c in chunks])
23 self.vector_store.upsert(chunks, embeddings)
24 
25 stats["files"] += 1
26 stats["chunks"] += len(chunks)
27 except Exception as e:
28 stats["errors"].append({"file": file_path, "error": str(e)})
29 
30 return stats
31 

FastAPI Integration

python
1from fastapi import FastAPI, HTTPException
2from pydantic import BaseModel
3 
4app = FastAPI()
5 
6class QueryRequest(BaseModel):
7 question: str
8 filters: dict | None = None
9 
10class QueryResponse(BaseModel):
11 answer: str
12 sources: list[dict]
13 tokens: int
14 
15rag = RAGPipeline(
16 config=RAGConfig(),
17 embedding_service=EmbeddingService(),
18 vector_store=QdrantVectorStore(
19 url=os.environ["QDRANT_URL"],
20 api_key=os.environ["QDRANT_API_KEY"],
21 collection="knowledge_base",
22 dimensions=1536,
23 ),
24)
25 
26@app.post("/api/query", response_model=QueryResponse)
27async def query_endpoint(request: QueryRequest):
28 result = rag.query(request.question, filters=request.filters)
29 return QueryResponse(**result)
30 

Conclusion

A production RAG pipeline in Python is a composition of well-defined stages: parsing, chunking, embedding, retrieval, and generation. Each stage has clear inputs, outputs, and failure modes. The key engineering decisions — chunking strategy, embedding model, vector database, and generation model — should be driven by your specific corpus characteristics and quality requirements.

Start with the simplest configuration (fixed-size chunking, text-embedding-3-small, Qdrant, gpt-4o-mini) and iterate based on retrieval quality metrics. The architecture shown here supports incremental upgrades — swapping the chunker, embedding model, or vector store requires changing a single component without affecting the rest of the pipeline.

FAQ

Need expert help?

Building with agentic AI?

I help teams ship production-grade systems. From architecture review to hands-on builds.

Muneer Puthiya Purayil

SaaS Architect & AI Systems Engineer. 10+ years shipping production infrastructure across fintech, automotive, e-commerce, and healthcare.

Engage

Start a
Conversation.

For teams building at scale: SaaS platforms, agentic AI systems, and enterprise mobile infrastructure. Scope and fit are evaluated before any engagement begins.

Limited availability · Q3 / Q4 2026