Back to Journal
AI Architecture

RAG Pipeline Design Best Practices for Enterprise Teams

Battle-tested best practices for RAG Pipeline Design tailored to Enterprise teams, including anti-patterns to avoid and a ready-to-use checklist.

Muneer Puthiya Purayil 15 min read

Enterprise RAG pipelines operate under constraints that fundamentally differ from prototypes: strict data governance, auditability requirements, multi-source retrieval across heterogeneous document stores, and latency SLAs that must hold at the 99th percentile. These best practices address the engineering challenges of deploying RAG systems that enterprise security and compliance teams approve.

Document Ingestion Architecture

Implement a Multi-Stage Ingestion Pipeline

Enterprise document corpuses include PDFs, Word documents, HTML pages, Confluence wikis, Slack threads, and proprietary formats. Build the ingestion pipeline as a series of idempotent stages:

python
1from dataclasses import dataclass
2from enum import Enum
3 
4class DocumentStatus(Enum):
5 QUEUED = "queued"
6 PARSING = "parsing"
7 CHUNKING = "chunking"
8 EMBEDDING = "embedding"
9 INDEXED = "indexed"
10 FAILED = "failed"
11 
12@dataclass
13class IngestionJob:
14 document_id: str
15 source: str
16 status: DocumentStatus
17 chunk_count: int = 0
18 error_message: str | None = None
19 
20class IngestionPipeline:
21 def __init__(self, parser, chunker, embedder, vector_store):
22 self.parser = parser
23 self.chunker = chunker
24 self.embedder = embedder
25 self.vector_store = vector_store
26 
27 async def process(self, job: IngestionJob) -> IngestionJob:
28 try:
29 job.status = DocumentStatus.PARSING
30 raw_text = await self.parser.parse(job.document_id, job.source)
31 
32 job.status = DocumentStatus.CHUNKING
33 chunks = self.chunker.chunk(raw_text, metadata={"doc_id": job.document_id})
34 job.chunk_count = len(chunks)
35 
36 job.status = DocumentStatus.EMBEDDING
37 embeddings = await self.embedder.embed_batch(
38 [c.text for c in chunks],
39 batch_size=64
40 )
41 
42 job.status = DocumentStatus.INDEXED
43 await self.vector_store.upsert(
44 ids=[c.id for c in chunks],
45 embeddings=embeddings,
46 metadata=[c.metadata for c in chunks],
47 documents=[c.text for c in chunks],
48 )
49 
50 return job
51 except Exception as e:
52 job.status = DocumentStatus.FAILED
53 job.error_message = str(e)
54 return job
55 

Version Your Embeddings

When you change embedding models (moving from text-embedding-ada-002 to text-embedding-3-large), all existing vectors become incompatible. Maintain embedding version metadata:

python
1@dataclass
2class EmbeddingConfig:
3 model: str
4 dimensions: int
5 version: str
6 
7 @property
8 def collection_name(self) -> str:
9 return f"docs_v{self.version}_{self.model.replace('-', '_')}"
10 
11CURRENT_CONFIG = EmbeddingConfig(
12 model="text-embedding-3-large",
13 dimensions=1536,
14 version="3",
15)
16 

Run the old and new collections in parallel during migration, routing queries to the new collection once re-indexing is complete. Never delete the old collection until you have verified retrieval quality on the new one.

Chunking Strategies That Work at Scale

Semantic Chunking Over Fixed-Size

Fixed-size chunking (500 tokens with 50 token overlap) is the most common approach but produces poor results on structured enterprise documents where section boundaries carry meaning:

python
1import re
2 
3class SemanticChunker:
4 def __init__(self, max_tokens: int = 512, min_tokens: int = 100):
5 self.max_tokens = max_tokens
6 self.min_tokens = min_tokens
7 
8 def chunk(self, text: str, metadata: dict) -> list[dict]:
9 # Split on document structure first
10 sections = self._split_on_headers(text)
11 chunks = []
12 
13 for section in sections:
14 if self._token_count(section.text) <= self.max_tokens:
15 chunks.append({
16 "text": section.text,
17 "metadata": {**metadata, "section": section.header},
18 "id": f"{metadata['doc_id']}_{len(chunks)}"
19 })
20 else:
21 # Sub-chunk long sections on paragraph boundaries
22 paragraphs = section.text.split("\n\n")
23 current_chunk = ""
24 for para in paragraphs:
25 if self._token_count(current_chunk + para) > self.max_tokens:
26 if current_chunk:
27 chunks.append({
28 "text": current_chunk.strip(),
29 "metadata": {**metadata, "section": section.header},
30 "id": f"{metadata['doc_id']}_{len(chunks)}"
31 })
32 current_chunk = para
33 else:
34 current_chunk += "\n\n" + para
35 
36 if current_chunk and self._token_count(current_chunk) >= self.min_tokens:
37 chunks.append({
38 "text": current_chunk.strip(),
39 "metadata": {**metadata, "section": section.header},
40 "id": f"{metadata['doc_id']}_{len(chunks)}"
41 })
42 
43 return chunks
44 
45 def _split_on_headers(self, text: str):
46 pattern = r'^(#{1,3})\s+(.+)$'
47 sections = []
48 current_header = "Introduction"
49 current_text = ""
50 
51 for line in text.split("\n"):
52 match = re.match(pattern, line)
53 if match:
54 if current_text.strip():
55 sections.append(Section(header=current_header, text=current_text.strip()))
56 current_header = match.group(2)
57 current_text = ""
58 else:
59 current_text += line + "\n"
60 
61 if current_text.strip():
62 sections.append(Section(header=current_header, text=current_text.strip()))
63 
64 return sections
65 
66 def _token_count(self, text: str) -> int:
67 return len(text.split()) * 4 // 3 # Rough approximation
68 

Include Parent-Child Context

Each chunk should carry enough context to be understood independently. Attach the parent section header and document title:

python
1def enrich_chunk(chunk: dict, document_title: str, section_hierarchy: list[str]) -> dict:
2 context_prefix = f"Document: {document_title}\n"
3 if section_hierarchy:
4 context_prefix += f"Section: {' > '.join(section_hierarchy)}\n\n"
5 
6 chunk["text"] = context_prefix + chunk["text"]
7 chunk["metadata"]["document_title"] = document_title
8 chunk["metadata"]["section_path"] = " > ".join(section_hierarchy)
9 return chunk
10 

Retrieval Architecture

Hybrid Search: Dense + Sparse

Pure vector search misses exact keyword matches. Pure keyword search misses semantic similarity. Combine both:

python
1class HybridRetriever:
2 def __init__(self, vector_store, bm25_index, alpha: float = 0.7):
3 self.vector_store = vector_store
4 self.bm25_index = bm25_index
5 self.alpha = alpha # Weight for vector search (1-alpha for BM25)
6 
7 async def retrieve(self, query: str, top_k: int = 10) -> list[dict]:
8 # Run both retrievals in parallel
9 vector_results, bm25_results = await asyncio.gather(
10 self.vector_store.query(query, top_k=top_k * 2),
11 self.bm25_index.search(query, top_k=top_k * 2),
12 )
13 
14 # Reciprocal Rank Fusion
15 scores: dict[str, float] = {}
16 for rank, result in enumerate(vector_results):
17 scores[result["id"]] = scores.get(result["id"], 0) + self.alpha / (rank + 60)
18 for rank, result in enumerate(bm25_results):
19 scores[result["id"]] = scores.get(result["id"], 0) + (1 - self.alpha) / (rank + 60)
20 
21 # Sort by combined score and return top_k
22 ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:top_k]
23 return [self._get_document(doc_id) for doc_id, _ in ranked]
24 

Implement Query Expansion

Enterprise queries are often ambiguous or use domain-specific terminology. Expand queries before retrieval:

python
1class QueryExpander:
2 def __init__(self, llm_client):
3 self.llm = llm_client
4 
5 async def expand(self, query: str) -> list[str]:
6 response = await self.llm.chat(
7 model="claude-sonnet-4-5-20250514",
8 messages=[{
9 "role": "user",
10 "content": (
11 f"Generate 3 alternative phrasings of this search query. "
12 f"Include relevant technical synonyms. Return only the queries, one per line.\n\n"
13 f"Query: {query}"
14 )
15 }],
16 max_tokens=200,
17 )
18 
19 alternatives = response.content[0].text.strip().split("\n")
20 return [query] + [q.strip() for q in alternatives if q.strip()]
21 

Access Control and Data Governance

Document-Level Permissions

Enterprise RAG systems must respect existing access control. Filter retrieval results based on the requesting user's permissions:

python
1class PermissionFilteredRetriever:
2 def __init__(self, retriever, permission_service):
3 self.retriever = retriever
4 self.permission_service = permission_service
5 
6 async def retrieve(self, query: str, user_id: str, top_k: int = 10) -> list[dict]:
7 # Retrieve more candidates to account for filtering
8 candidates = await self.retriever.retrieve(query, top_k=top_k * 3)
9 
10 # Filter by user permissions
11 permitted_doc_ids = await self.permission_service.get_accessible_docs(user_id)
12 filtered = [
13 c for c in candidates
14 if c["metadata"]["doc_id"] in permitted_doc_ids
15 ]
16 
17 return filtered[:top_k]
18 

Audit Logging

Every RAG interaction must be logged for compliance:

python
1@dataclass
2class RAGAuditEntry:
3 timestamp: str
4 user_id: str
5 query: str
6 retrieved_doc_ids: list[str]
7 response_text: str
8 model: str
9 latency_ms: int
10 token_count: int
11 
12class AuditLogger:
13 async def log(self, entry: RAGAuditEntry) -> None:
14 await self.store.insert("rag_audit", {
15 "timestamp": entry.timestamp,
16 "user_id": entry.user_id,
17 "query": entry.query,
18 "sources": entry.retrieved_doc_ids,
19 "response_hash": hashlib.sha256(entry.response_text.encode()).hexdigest(),
20 "model": entry.model,
21 "latency_ms": entry.latency_ms,
22 "tokens": entry.token_count,
23 })
24 

Need a second opinion on your AI systems architecture?

I run free 30-minute strategy calls for engineering teams tackling this exact problem.

Book a Free Call

Evaluation and Monitoring

Automated Retrieval Quality Metrics

Track retrieval quality continuously, not just during development:

python
1class RetrievalMetrics:
2 @staticmethod
3 def mean_reciprocal_rank(relevant_ids: set[str], retrieved: list[dict]) -> float:
4 for rank, doc in enumerate(retrieved, 1):
5 if doc["id"] in relevant_ids:
6 return 1.0 / rank
7 return 0.0
8 
9 @staticmethod
10 def precision_at_k(relevant_ids: set[str], retrieved: list[dict], k: int) -> float:
11 top_k = retrieved[:k]
12 relevant_count = sum(1 for doc in top_k if doc["id"] in relevant_ids)
13 return relevant_count / k
14 
15 @staticmethod
16 def context_relevance(query: str, contexts: list[str], llm_client) -> float:
17 """Use LLM to judge if retrieved contexts are relevant to the query."""
18 # Implementation uses LLM-as-judge pattern
19 pass
20 

Checklist

  • Multi-format document parser (PDF, Word, HTML, Markdown)
  • Semantic chunking with section-aware boundaries
  • Embedding versioning with migration support
  • Hybrid search (dense + sparse retrieval)
  • Document-level access control filtering
  • Audit logging for all RAG interactions
  • Query expansion for ambiguous queries
  • Retrieval quality monitoring with MRR and precision metrics
  • Hallucination detection in generated responses
  • Source citation in every generated response
  • Rate limiting per user/department
  • PII detection and redaction in ingested documents

Anti-Patterns to Avoid

Single embedding model dependency: Lock-in to one embedding provider makes migration painful. Abstract the embedding interface and maintain compatibility metadata per vector collection.

Ignoring chunk boundaries in responses: The LLM should cite specific chunks in its response. Without source attribution, enterprise users cannot verify the answer against the original document.

Over-retrieving without re-ranking: Retrieving 50 chunks and stuffing them into the prompt wastes tokens and degrades response quality. Retrieve broadly, re-rank with a cross-encoder, then pass the top 5-8 most relevant chunks to the LLM.

Skipping permission filtering for performance: Filtering after retrieval is slower but correct. Pre-filtering by embedding user permissions into the vector query is faster but creates security risks when permissions change and the index is not immediately updated.

Conclusion

Enterprise RAG pipelines require engineering rigor beyond what research prototypes demonstrate. The retrieval component is only as good as the ingestion pipeline feeding it — semantic chunking, embedding versioning, and multi-format parsing determine the ceiling of retrieval quality. Access control and audit logging are not optional features but prerequisites for enterprise deployment.

Invest in evaluation infrastructure from the start. Automated retrieval quality metrics (MRR, precision@k) and LLM-judged response relevance scores provide the feedback loop needed to iterate on chunking strategies, embedding models, and retrieval parameters. Without measurement, RAG pipeline improvements are guesswork.

FAQ

Need expert help?

Building with agentic AI?

I help teams ship production-grade systems. From architecture review to hands-on builds.

Muneer Puthiya Purayil

SaaS Architect & AI Systems Engineer. 10+ years shipping production infrastructure across fintech, automotive, e-commerce, and healthcare.

Engage

Start a
Conversation.

For teams building at scale: SaaS platforms, agentic AI systems, and enterprise mobile infrastructure. Scope and fit are evaluated before any engagement begins.

Limited availability · Q3 / Q4 2026