Building a Production RAG Pipeline with Qdrant and FastAPI
Retrieval-Augmented Generation (RAG) has become the go-to architecture for building AI applications that need access to external knowledge. But there's a big gap between the tutorials and production-ready systems.
In this guide, we'll build a production RAG pipeline using Qdrant (vector database) and FastAPI that can handle real-world workloads.
The Architecture
Our RAG system consists of four main components:
- Document Ingestion Pipeline - Processes and chunks documents
- Embedding Service - Converts text to vectors using OpenAI or open-source models
- Vector Database - Stores and retrieves embeddings (Qdrant)
- Query API - FastAPI endpoints for search and generation
Document Chunking Strategies
The quality of your RAG system heavily depends on how you chunk your documents. Here are three strategies we've tested:
1. Fixed-Size Chunking
def chunk_fixed_size(text: str, chunk_size: int = 512, overlap: int = 50):
chunks = []
for i in range(0, len(text), chunk_size - overlap):
chunk = text[i:i + chunk_size]
chunks.append(chunk)
return chunksPros: Simple, predictable token counts Cons: Can split sentences/paragraphs awkwardly
2. Semantic Chunking
Uses sentence embeddings to identify natural breakpoints:
from sentence_transformers import SentenceTransformer
def chunk_semantic(text: str, threshold: float = 0.5):
sentences = sent_tokenize(text)
model = SentenceTransformer('all-MiniLM-L6-v2')
embeddings = model.encode(sentences)
# Calculate similarity between consecutive sentences
chunks = []
current_chunk = [sentences[0]]
for i in range(1, len(sentences)):
similarity = cosine_similarity(embeddings[i-1], embeddings[i])
if similarity < threshold:
chunks.append(' '.join(current_chunk))
current_chunk = [sentences[i]]
else:
current_chunk.append(sentences[i])
return chunksPros: Preserves semantic coherence Cons: More computationally expensive
3. Markdown-Aware Chunking (Our Recommendation)
Respects document structure (headers, code blocks, lists):
def chunk_markdown(content: str, max_tokens: int = 512):
sections = split_by_headers(content)
chunks = []
for section in sections:
if count_tokens(section) <= max_tokens:
chunks.append(section)
else:
# Further split large sections
sub_chunks = chunk_by_paragraphs(section, max_tokens)
chunks.extend(sub_chunks)
return chunksSetting Up Qdrant
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams
client = QdrantClient(url="http://localhost:6333")
# Create collection
client.create_collection(
collection_name="docs",
vectors_config=VectorParams(
size=1536, # OpenAI ada-002 dimension
distance=Distance.COSINE
)
)Embedding & Indexing
from openai import OpenAI
openai_client = OpenAI()
def get_embedding(text: str):
response = openai_client.embeddings.create(
model="text-embedding-ada-002",
input=text
)
return response.data[0].embedding
# Index documents
def index_document(doc_id: str, chunks: list[str]):
points = []
for i, chunk in enumerate(chunks):
embedding = get_embedding(chunk)
points.append({
"id": f"{doc_id}_{i}",
"vector": embedding,
"payload": {
"text": chunk,
"doc_id": doc_id,
"chunk_index": i
}
})
client.upsert(
collection_name="docs",
points=points
)FastAPI Query Endpoint
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class QueryRequest(BaseModel):
query: str
top_k: int = 5
@app.post("/search")
async def search(request: QueryRequest):
# Get query embedding
query_embedding = get_embedding(request.query)
# Search Qdrant
results = client.search(
collection_name="docs",
query_vector=query_embedding,
limit=request.top_k
)
# Format results
return {
"results": [
{
"text": hit.payload["text"],
"score": hit.score,
"doc_id": hit.payload["doc_id"]
}
for hit in results
]
}Retrieval Optimization
1. Hybrid Search
Combine vector search with keyword search for better results:
from qdrant_client.models import Filter, FieldCondition, MatchValue
# Keyword pre-filter
results = client.search(
collection_name="docs",
query_vector=query_embedding,
query_filter=Filter(
must=[
FieldCondition(
key="text",
match=MatchValue(value="python")
)
]
),
limit=top_k
)2. Re-ranking
Use a cross-encoder to re-rank retrieved chunks:
from sentence_transformers import CrossEncoder
reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
def rerank_results(query: str, results: list):
pairs = [[query, r["text"]] for r in results]
scores = reranker.predict(pairs)
# Sort by reranker scores
reranked = sorted(
zip(results, scores),
key=lambda x: x[1],
reverse=True
)
return [r[0] for r in reranked]Production Considerations
Caching
Cache embeddings for frequently queried content:
import redis
import hashlib
redis_client = redis.Redis()
def get_cached_embedding(text: str):
cache_key = hashlib.md5(text.encode()).hexdigest()
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
embedding = get_embedding(text)
redis_client.setex(cache_key, 3600, json.dumps(embedding))
return embeddingMonitoring
Track key metrics:
- Query latency (target: <200ms)
- Retrieval accuracy (measure with eval set)
- Cache hit rate
- Embedding API costs
Cost Optimization
- Use smaller embedding models for less critical use cases
- Implement smart caching
- Batch embed operations when possible
Evaluation
Create a test set and measure:
def evaluate_rag(test_queries: list):
results = []
for query, expected_docs in test_queries:
retrieved = search(query, top_k=5)
precision = len(set(retrieved) & set(expected_docs)) / 5
results.append(precision)
return sum(results) / len(results)Conclusion
Building production RAG requires careful attention to chunking, retrieval quality, and system performance. This architecture has handled 200K+ daily queries for our e-commerce clients.
Key takeaways:
- Chunking strategy matters more than you think
- Hybrid search beats pure vector search
- Always re-rank your results
- Monitor and optimize continuously
Next Steps: Try implementing this with your own data and iterate based on retrieval quality metrics.
