Back to Journal
AI Architecture

How to Build RAG Pipeline Design Using Fastapi

Step-by-step tutorial for building RAG Pipeline Design with Fastapi, from project setup through deployment.

Muneer Puthiya Purayil 25 min read

FastAPI is the natural choice for building RAG pipeline APIs in Python — its async support handles concurrent embedding and LLM requests efficiently, and its automatic OpenAPI documentation makes the API self-describing. This tutorial builds a complete RAG API from project setup through deployment.

Project Setup

bash
1mkdir rag-api && cd rag-api
2python -m venv .venv && source .venv/bin/activate
3pip install fastapi uvicorn openai anthropic qdrant-client python-multipart pymupdf beautifulsoup4
4 
1rag-api/
2├── app/
3│ ├── __init__.py
4│ ├── main.py
5│ ├── config.py
6│ ├── models.py
7│ ├── services/
8│ │ ├── __init__.py
9│ │ ├── parser.py
10│ │ ├── chunker.py
11│ │ ├── embedder.py
12│ │ ├── retriever.py
13│ │ └── generator.py
14│ ├── routes/
15│ │ ├── __init__.py
16│ │ ├── query.py
17│ │ └── ingest.py
18│ └── dependencies.py
19├── tests/
20├── requirements.txt
21└── Dockerfile
22 

Configuration

python
1# app/config.py
2from pydantic_settings import BaseSettings
3 
4class Settings(BaseSettings):
5 openai_api_key: str
6 anthropic_api_key: str
7 qdrant_url: str
8 qdrant_api_key: str
9 collection_name: str = "knowledge_base"
10 embedding_model: str = "text-embedding-3-small"
11 embedding_dimensions: int = 1536
12 generation_model: str = "claude-sonnet-4-5-20250514"
13 chunk_size: int = 512
14 chunk_overlap: int = 50
15 retrieval_top_k: int = 5
16 max_generation_tokens: int = 1024
17 max_upload_size: int = 10_485_760 # 10MB
18 
19 class Config:
20 env_file = ".env"
21 
22settings = Settings()
23 

Data Models

python
1# app/models.py
2from pydantic import BaseModel, Field
3 
4class QueryRequest(BaseModel):
5 question: str = Field(..., min_length=3, max_length=1000)
6 filters: dict[str, str] | None = None
7 top_k: int = Field(default=5, ge=1, le=20)
8 
9class Source(BaseModel):
10 text: str
11 score: float
12 source: str
13 section: str | None = None
14 
15class QueryResponse(BaseModel):
16 answer: str
17 sources: list[Source]
18 model: str
19 tokens: int
20 retrieval_time_ms: float
21 generation_time_ms: float
22 
23class IngestRequest(BaseModel):
24 source_name: str
25 content: str
26 
27class IngestResponse(BaseModel):
28 document_id: str
29 chunks_created: int
30 source: str
31 
32class IngestFileResponse(BaseModel):
33 document_id: str
34 chunks_created: int
35 filename: str
36 file_size: int
37 

Parser Service

python
1# app/services/parser.py
2import os
3from abc import ABC, abstractmethod
4 
5class DocumentParser(ABC):
6 @abstractmethod
7 async def parse(self, content: bytes, filename: str) -> str:
8 pass
9 
10class TextParser(DocumentParser):
11 async def parse(self, content: bytes, filename: str) -> str:
12 return content.decode("utf-8")
13 
14class PDFParser(DocumentParser):
15 async def parse(self, content: bytes, filename: str) -> str:
16 import fitz
17 doc = fitz.open(stream=content, filetype="pdf")
18 text = ""
19 for page in doc:
20 text += page.get_text() + "\n"
21 doc.close()
22 return text
23 
24class HTMLParser(DocumentParser):
25 async def parse(self, content: bytes, filename: str) -> str:
26 from bs4 import BeautifulSoup
27 soup = BeautifulSoup(content, "html.parser")
28 for tag in soup(["script", "style", "nav", "footer", "header"]):
29 tag.decompose()
30 return soup.get_text(separator="\n", strip=True)
31 
32class ParserFactory:
33 _parsers: dict[str, type[DocumentParser]] = {
34 ".txt": TextParser,
35 ".md": TextParser,
36 ".pdf": PDFParser,
37 ".html": HTMLParser,
38 ".htm": HTMLParser,
39 }
40 
41 @classmethod
42 def get_parser(cls, filename: str) -> DocumentParser:
43 ext = os.path.splitext(filename)[1].lower()
44 parser_cls = cls._parsers.get(ext)
45 if not parser_cls:
46 raise ValueError(f"Unsupported file format: {ext}")
47 return parser_cls()
48 

Chunker Service

python
1# app/services/chunker.py
2import uuid
3import re
4from dataclasses import dataclass, field
5 
6@dataclass
7class Chunk:
8 id: str
9 text: str
10 metadata: dict = field(default_factory=dict)
11 
12class ChunkerService:
13 def __init__(self, max_tokens: int = 512, overlap: int = 50):
14 self.max_tokens = max_tokens
15 self.overlap = overlap
16 
17 def chunk(self, text: str, document_id: str, source: str) -> list[Chunk]:
18 sections = self._extract_sections(text)
19 chunks: list[Chunk] = []
20 
21 for section in sections:
22 section_text = f"Section: {section['header']}\n\n{section['content']}"
23 
24 if self._token_count(section_text) <= self.max_tokens:
25 chunks.append(Chunk(
26 id=str(uuid.uuid4()),
27 text=section_text,
28 metadata={
29 "document_id": document_id,
30 "source": source,
31 "section": section["header"],
32 "chunk_index": len(chunks),
33 },
34 ))
35 else:
36 paragraphs = section["content"].split("\n\n")
37 current = f"Section: {section['header']}\n\n"
38 
39 for para in paragraphs:
40 if self._token_count(current + para) > self.max_tokens:
41 if current.strip():
42 chunks.append(Chunk(
43 id=str(uuid.uuid4()),
44 text=current.strip(),
45 metadata={
46 "document_id": document_id,
47 "source": source,
48 "section": section["header"],
49 "chunk_index": len(chunks),
50 },
51 ))
52 current = f"Section: {section['header']}\n\n{para}\n\n"
53 else:
54 current += para + "\n\n"
55 
56 if current.strip() and self._token_count(current) > 20:
57 chunks.append(Chunk(
58 id=str(uuid.uuid4()),
59 text=current.strip(),
60 metadata={
61 "document_id": document_id,
62 "source": source,
63 "section": section["header"],
64 "chunk_index": len(chunks),
65 },
66 ))
67 
68 return chunks
69 
70 def _extract_sections(self, text: str) -> list[dict]:
71 sections = []
72 current_header = "Introduction"
73 current_content = ""
74 
75 for line in text.split("\n"):
76 match = re.match(r"^#{1,3}\s+(.+)$", line)
77 if match:
78 if current_content.strip():
79 sections.append({"header": current_header, "content": current_content.strip()})
80 current_header = match.group(1)
81 current_content = ""
82 else:
83 current_content += line + "\n"
84 
85 if current_content.strip():
86 sections.append({"header": current_header, "content": current_content.strip()})
87 
88 return sections
89 
90 def _token_count(self, text: str) -> int:
91 return len(text.split()) * 4 // 3
92 

Embedding Service

python
1# app/services/embedder.py
2from openai import AsyncOpenAI
3from app.config import settings
4 
5class EmbeddingService:
6 def __init__(self):
7 self.client = AsyncOpenAI(api_key=settings.openai_api_key)
8 self.model = settings.embedding_model
9 
10 async def embed(self, texts: list[str], batch_size: int = 64) -> list[list[float]]:
11 embeddings = []
12 for i in range(0, len(texts), batch_size):
13 batch = texts[i:i + batch_size]
14 response = await self.client.embeddings.create(
15 model=self.model,
16 input=batch,
17 )
18 embeddings.extend([item.embedding for item in response.data])
19 return embeddings
20 
21 async def embed_query(self, query: str) -> list[float]:
22 result = await self.embed([query])
23 return result[0]
24 

Retriever Service

python
1# app/services/retriever.py
2from qdrant_client import AsyncQdrantClient
3from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue
4from app.config import settings
5 
6class RetrieverService:
7 def __init__(self):
8 self.client = AsyncQdrantClient(
9 url=settings.qdrant_url,
10 api_key=settings.qdrant_api_key,
11 )
12 self.collection = settings.collection_name
13 self.dimensions = settings.embedding_dimensions
14 
15 async def ensure_collection(self):
16 collections = await self.client.get_collections()
17 if self.collection not in [c.name for c in collections.collections]:
18 await self.client.create_collection(
19 collection_name=self.collection,
20 vectors_config=VectorParams(
21 size=self.dimensions,
22 distance=Distance.COSINE,
23 ),
24 )
25 
26 async def upsert(self, chunks: list, embeddings: list[list[float]]):
27 points = [
28 PointStruct(
29 id=chunk.id,
30 vector=embedding,
31 payload={"text": chunk.text, **chunk.metadata},
32 )
33 for chunk, embedding in zip(chunks, embeddings)
34 ]
35 await self.client.upsert(collection_name=self.collection, points=points)
36 
37 async def search(self, query_embedding: list[float], top_k: int = 5,
38 filters: dict | None = None) -> list[dict]:
39 query_filter = None
40 if filters:
41 conditions = [
42 FieldCondition(key=k, match=MatchValue(value=v))
43 for k, v in filters.items()
44 ]
45 query_filter = Filter(must=conditions)
46 
47 results = await self.client.search(
48 collection_name=self.collection,
49 query_vector=query_embedding,
50 limit=top_k,
51 query_filter=query_filter,
52 )
53 
54 return [
55 {
56 "id": str(r.id),
57 "text": r.payload.get("text", ""),
58 "score": r.score,
59 "source": r.payload.get("source", "Unknown"),
60 "section": r.payload.get("section"),
61 }
62 for r in results
63 ]
64 
65 async def delete_by_document(self, document_id: str):
66 await self.client.delete(
67 collection_name=self.collection,
68 points_selector=Filter(
69 must=[FieldCondition(key="document_id", match=MatchValue(value=document_id))]
70 ),
71 )
72 

Need a second opinion on your AI systems architecture?

I run free 30-minute strategy calls for engineering teams tackling this exact problem.

Book a Free Call

Generator Service

python
1# app/services/generator.py
2from anthropic import AsyncAnthropic
3from app.config import settings
4 
5class GeneratorService:
6 def __init__(self):
7 self.client = AsyncAnthropic(api_key=settings.anthropic_api_key)
8 self.model = settings.generation_model
9 
10 async def generate(self, question: str, context: str,
11 max_tokens: int | None = None) -> dict:
12 response = await self.client.messages.create(
13 model=self.model,
14 max_tokens=max_tokens or settings.max_generation_tokens,
15 system=(
16 "You are a helpful assistant. Answer the user's question using only "
17 "the provided context. If the context does not contain enough information, "
18 "say so. Always cite your sources by referencing the document title or section."
19 ),
20 messages=[
21 {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}
22 ],
23 )
24 
25 return {
26 "text": response.content[0].text,
27 "model": self.model,
28 "input_tokens": response.usage.input_tokens,
29 "output_tokens": response.usage.output_tokens,
30 }
31 
32 async def generate_stream(self, question: str, context: str):
33 async with self.client.messages.stream(
34 model=self.model,
35 max_tokens=settings.max_generation_tokens,
36 system="Answer using only the provided context. Cite your sources.",
37 messages=[
38 {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}
39 ],
40 ) as stream:
41 async for text in stream.text_stream:
42 yield text
43 

API Routes

python
1# app/routes/query.py
2import time
3from fastapi import APIRouter, Depends
4from fastapi.responses import StreamingResponse
5from app.models import QueryRequest, QueryResponse, Source
6from app.dependencies import get_embedder, get_retriever, get_generator
7 
8router = APIRouter(prefix="/api", tags=["query"])
9 
10@router.post("/query", response_model=QueryResponse)
11async def query(
12 request: QueryRequest,
13 embedder=Depends(get_embedder),
14 retriever=Depends(get_retriever),
15 generator=Depends(get_generator),
16):
17 # Retrieve
18 retrieval_start = time.perf_counter()
19 query_embedding = await embedder.embed_query(request.question)
20 results = await retriever.search(query_embedding, request.top_k, request.filters)
21 retrieval_ms = (time.perf_counter() - retrieval_start) * 1000
22 
23 if not results:
24 return QueryResponse(
25 answer="I could not find relevant information to answer this question.",
26 sources=[],
27 model="",
28 tokens=0,
29 retrieval_time_ms=retrieval_ms,
30 generation_time_ms=0,
31 )
32 
33 # Build context
34 context = "\n\n---\n\n".join(
35 f"[Source: {r['source']}" + (f" - {r['section']}" if r.get("section") else "") + f"]\n{r['text']}"
36 for r in results
37 )
38 
39 # Generate
40 gen_start = time.perf_counter()
41 gen_result = await generator.generate(request.question, context)
42 gen_ms = (time.perf_counter() - gen_start) * 1000
43 
44 return QueryResponse(
45 answer=gen_result["text"],
46 sources=[
47 Source(
48 text=r["text"][:200],
49 score=r["score"],
50 source=r["source"],
51 section=r.get("section"),
52 )
53 for r in results
54 ],
55 model=gen_result["model"],
56 tokens=gen_result["input_tokens"] + gen_result["output_tokens"],
57 retrieval_time_ms=round(retrieval_ms, 2),
58 generation_time_ms=round(gen_ms, 2),
59 )
60 
61@router.post("/query/stream")
62async def query_stream(
63 request: QueryRequest,
64 embedder=Depends(get_embedder),
65 retriever=Depends(get_retriever),
66 generator=Depends(get_generator),
67):
68 query_embedding = await embedder.embed_query(request.question)
69 results = await retriever.search(query_embedding, request.top_k, request.filters)
70 
71 context = "\n\n---\n\n".join(
72 f"[Source: {r['source']}]\n{r['text']}" for r in results
73 )
74 
75 return StreamingResponse(
76 generator.generate_stream(request.question, context),
77 media_type="text/event-stream",
78 )
79 
python
1# app/routes/ingest.py
2import uuid
3from fastapi import APIRouter, Depends, UploadFile, File, HTTPException
4from app.models import IngestRequest, IngestResponse, IngestFileResponse
5from app.config import settings
6from app.dependencies import get_chunker, get_embedder, get_retriever
7from app.services.parser import ParserFactory
8 
9router = APIRouter(prefix="/api", tags=["ingest"])
10 
11@router.post("/ingest", response_model=IngestResponse)
12async def ingest_text(
13 request: IngestRequest,
14 chunker=Depends(get_chunker),
15 embedder=Depends(get_embedder),
16 retriever=Depends(get_retriever),
17):
18 document_id = str(uuid.uuid4())
19 chunks = chunker.chunk(request.content, document_id, request.source_name)
20 embeddings = await embedder.embed([c.text for c in chunks])
21 await retriever.upsert(chunks, embeddings)
22 
23 return IngestResponse(
24 document_id=document_id,
25 chunks_created=len(chunks),
26 source=request.source_name,
27 )
28 
29@router.post("/ingest/file", response_model=IngestFileResponse)
30async def ingest_file(
31 file: UploadFile = File(...),
32 chunker=Depends(get_chunker),
33 embedder=Depends(get_embedder),
34 retriever=Depends(get_retriever),
35):
36 if file.size and file.size > settings.max_upload_size:
37 raise HTTPException(413, "File too large")
38 
39 content = await file.read()
40 parser = ParserFactory.get_parser(file.filename or "unknown.txt")
41 text = await parser.parse(content, file.filename or "upload")
42 
43 document_id = str(uuid.uuid4())
44 chunks = chunker.chunk(text, document_id, file.filename or "upload")
45 embeddings = await embedder.embed([c.text for c in chunks])
46 await retriever.upsert(chunks, embeddings)
47 
48 return IngestFileResponse(
49 document_id=document_id,
50 chunks_created=len(chunks),
51 filename=file.filename or "upload",
52 file_size=file.size or len(content),
53 )
54 

Application Entry Point

python
1# app/main.py
2from fastapi import FastAPI
3from contextlib import asynccontextmanager
4from app.routes import query, ingest
5from app.dependencies import get_retriever
6 
7@asynccontextmanager
8async def lifespan(app: FastAPI):
9 retriever = get_retriever()
10 await retriever.ensure_collection()
11 yield
12 
13app = FastAPI(
14 title="RAG API",
15 version="1.0.0",
16 lifespan=lifespan,
17)
18 
19app.include_router(query.router)
20app.include_router(ingest.router)
21 
22@app.get("/health")
23async def health():
24 return {"status": "healthy"}
25 

Dependencies

python
1# app/dependencies.py
2from functools import lru_cache
3from app.services.chunker import ChunkerService
4from app.services.embedder import EmbeddingService
5from app.services.retriever import RetrieverService
6from app.services.generator import GeneratorService
7from app.config import settings
8 
9@lru_cache
10def get_chunker() -> ChunkerService:
11 return ChunkerService(max_tokens=settings.chunk_size, overlap=settings.chunk_overlap)
12 
13@lru_cache
14def get_embedder() -> EmbeddingService:
15 return EmbeddingService()
16 
17@lru_cache
18def get_retriever() -> RetrieverService:
19 return RetrieverService()
20 
21@lru_cache
22def get_generator() -> GeneratorService:
23 return GeneratorService()
24 

Dockerfile

dockerfile
1FROM python:3.12-slim
2 
3WORKDIR /app
4 
5COPY requirements.txt .
6RUN pip install --no-cache-dir -r requirements.txt
7 
8COPY app/ app/
9 
10EXPOSE 8000
11CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
12 

Testing the API

bash
1# Ingest a document
2curl -X POST http://localhost:8000/api/ingest \
3 -H "Content-Type: application/json" \
4 -d '{"source_name": "docs", "content": "# Deployment Guide\n\nDeploy using Docker..."}'
5 
6# Upload a file
7curl -X POST http://localhost:8000/api/ingest/file \
8 -F "file=@./docs/guide.md"
9 
10# Query
11curl -X POST http://localhost:8000/api/query \
12 -H "Content-Type: application/json" \
13 -d '{"question": "How do I deploy the application?"}'
14 

Conclusion

FastAPI's async-first design aligns naturally with RAG pipeline requirements — concurrent embedding requests, parallel retrieval and generation, and streaming responses all benefit from async I/O. The dependency injection system keeps services loosely coupled and testable.

The architecture shown here handles the full RAG lifecycle: document upload, parsing, chunking, embedding, storage, retrieval, and generation. Each service is independently testable and replaceable. Start with this foundation, then add hybrid search, re-ranking, and evaluation metrics as your use case demands.

FAQ

Need expert help?

Building with agentic AI?

I help teams ship production-grade systems. From architecture review to hands-on builds.

Muneer Puthiya Purayil

SaaS Architect & AI Systems Engineer. 10+ years shipping production infrastructure across fintech, automotive, e-commerce, and healthcare.

Engage

Start a
Conversation.

For teams building at scale: SaaS platforms, agentic AI systems, and enterprise mobile infrastructure. Scope and fit are evaluated before any engagement begins.

Limited availability · Q3 / Q4 2026