title: "Building a Production-Ready RAG System with LangChain: A Complete Guide" description: "Learn how to build scalable retrieval augmented generation systems with LangChain, covering vector databases, embeddings, chunking strategies, and production optimizations" publishedAt: "2025-01-11" author: "Fenil Sonani" tags: ["langchain", "rag", "ai", "production", "vector-database", "embeddings"]

Retrieval Augmented Generation (RAG) has become the cornerstone of modern AI applications, enabling systems to leverage external knowledge bases while maintaining the flexibility of large language models. In this comprehensive guide, we'll build a production-ready RAG system using LangChain, addressing real-world challenges like scaling, monitoring, and cost optimization.

Understanding RAG Architecture

Before diving into implementation, let's understand what makes a RAG system production-ready. Unlike tutorial examples, production systems must handle:

  • High concurrency with thousands of simultaneous queries
  • Large-scale document ingestion processing gigabytes of data
  • Cost optimization balancing performance with API expenses
  • Reliability with proper error handling and retry mechanisms
  • Monitoring to track system health and performance

Setting Up the Development Environment

First, let's install the necessary dependencies:

pip install langchain langchain-community langchain-openai \
  pinecone-client chromadb weaviate-client \
  tiktoken faiss-cpu redis celery \
  prometheus-client structlog python-dotenv

Create a .env file for configuration:

OPENAI_API_KEY=your_openai_api_key
PINECONE_API_KEY=your_pinecone_api_key
PINECONE_ENVIRONMENT=your_pinecone_environment
REDIS_URL=redis://localhost:6379
WEAVIATE_URL=http://localhost:8080

Building the Core RAG Components

1. Document Processing Pipeline

The foundation of any RAG system is efficient document processing. Let's create a robust pipeline that handles various document types:

import asyncio
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
import hashlib
import structlog
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import (
    PyPDFLoader, TextLoader, UnstructuredMarkdownLoader,
    CSVLoader, JSONLoader
)
from langchain.schema import Document
import tiktoken

logger = structlog.get_logger()

class ChunkingStrategy(Enum):
    RECURSIVE = "recursive"
    SEMANTIC = "semantic"
    SLIDING_WINDOW = "sliding_window"

@dataclass
class ProcessingConfig:
    chunk_size: int = 1000
    chunk_overlap: int = 200
    chunking_strategy: ChunkingStrategy = ChunkingStrategy.RECURSIVE
    max_tokens_per_chunk: int = 500
    metadata_fields: List[str] = None

class DocumentProcessor:
    """Production-ready document processor with multiple chunking strategies."""
    
    def __init__(self, config: ProcessingConfig):
        self.config = config
        self.encoding = tiktoken.encoding_for_model("gpt-3.5-turbo")
        self._init_splitters()
        
    def _init_splitters(self):
        """Initialize text splitters based on strategy."""
        if self.config.chunking_strategy == ChunkingStrategy.RECURSIVE:
            self.splitter = RecursiveCharacterTextSplitter(
                chunk_size=self.config.chunk_size,
                chunk_overlap=self.config.chunk_overlap,
                length_function=self._token_length,
                separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""]
            )
    
    def _token_length(self, text: str) -> int:
        """Calculate token count for text."""
        return len(self.encoding.encode(text))
    
    async def process_document(self, file_path: str) -> List[Document]:
        """Process a single document with error handling."""
        try:
            loader = self._get_loader(file_path)
            documents = await asyncio.to_thread(loader.load)
            
            # Add document-level metadata
            for doc in documents:
                doc.metadata.update({
                    "source": file_path,
                    "doc_id": self._generate_doc_id(doc.page_content),
                    "processing_timestamp": asyncio.get_event_loop().time()
                })
            
            # Split documents into chunks
            chunks = await self._chunk_documents(documents)
            
            logger.info("document_processed", 
                       file_path=file_path, 
                       num_chunks=len(chunks))
            
            return chunks
            
        except Exception as e:
            logger.error("document_processing_failed", 
                        file_path=file_path, 
                        error=str(e))
            raise
    
    def _get_loader(self, file_path: str):
        """Select appropriate loader based on file type."""
        if file_path.endswith('.pdf'):
            return PyPDFLoader(file_path)
        elif file_path.endswith('.txt'):
            return TextLoader(file_path)
        elif file_path.endswith('.md'):
            return UnstructuredMarkdownLoader(file_path)
        elif file_path.endswith('.csv'):
            return CSVLoader(file_path)
        elif file_path.endswith('.json'):
            return JSONLoader(file_path)
        else:
            raise ValueError(f"Unsupported file type: {file_path}")
    
    async def _chunk_documents(self, documents: List[Document]) -> List[Document]:
        """Chunk documents with token limit validation."""
        chunks = []
        
        for doc in documents:
            doc_chunks = await asyncio.to_thread(
                self.splitter.split_documents, [doc]
            )
            
            # Validate token count
            validated_chunks = []
            for chunk in doc_chunks:
                token_count = self._token_length(chunk.page_content)
                if token_count <= self.config.max_tokens_per_chunk:
                    chunk.metadata['token_count'] = token_count
                    validated_chunks.append(chunk)
                else:
                    # Re-split if too large
                    smaller_chunks = await self._resplit_chunk(chunk)
                    validated_chunks.extend(smaller_chunks)
            
            chunks.extend(validated_chunks)
        
        return chunks
    
    async def _resplit_chunk(self, chunk: Document) -> List[Document]:
        """Recursively split chunks that exceed token limit."""
        temp_splitter = RecursiveCharacterTextSplitter(
            chunk_size=self.config.chunk_size // 2,
            chunk_overlap=self.config.chunk_overlap // 2,
            length_function=self._token_length
        )
        
        return await asyncio.to_thread(
            temp_splitter.split_documents, [chunk]
        )
    
    def _generate_doc_id(self, content: str) -> str:
        """Generate unique document ID."""
        return hashlib.md5(content.encode()).hexdigest()

# Batch processing for large document sets
class BatchDocumentProcessor:
    """Handle large-scale document ingestion with parallel processing."""
    
    def __init__(self, processor: DocumentProcessor, max_workers: int = 4):
        self.processor = processor
        self.max_workers = max_workers
        self.semaphore = asyncio.Semaphore(max_workers)
    
    async def process_batch(self, file_paths: List[str]) -> List[Document]:
        """Process multiple documents in parallel."""
        tasks = []
        for file_path in file_paths:
            task = self._process_with_semaphore(file_path)
            tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Collect successful results
        all_chunks = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                logger.error("batch_processing_error", 
                           file_path=file_paths[i], 
                           error=str(result))
            else:
                all_chunks.extend(result)
        
        return all_chunks
    
    async def _process_with_semaphore(self, file_path: str) -> List[Document]:
        """Process document with concurrency control."""
        async with self.semaphore:
            return await self.processor.process_document(file_path)

2. Vector Database Integration

Now let's implement a flexible vector store interface that supports multiple backends:

from abc import ABC, abstractmethod
from typing import List, Tuple, Dict, Any
import numpy as np
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Pinecone, Chroma, Weaviate
import pinecone
import chromadb
import weaviate
from tenacity import retry, stop_after_attempt, wait_exponential

class VectorStoreInterface(ABC):
    """Abstract interface for vector stores."""
    
    @abstractmethod
    async def add_documents(self, documents: List[Document]) -> List[str]:
        pass
    
    @abstractmethod
    async def similarity_search(self, query: str, k: int = 4) -> List[Document]:
        pass
    
    @abstractmethod
    async def delete(self, ids: List[str]) -> bool:
        pass

class PineconeVectorStore(VectorStoreInterface):
    """Production Pinecone integration with retry logic."""
    
    def __init__(self, index_name: str, embedding_model: OpenAIEmbeddings):
        self.index_name = index_name
        self.embedding_model = embedding_model
        self._init_pinecone()
    
    def _init_pinecone(self):
        """Initialize Pinecone with environment variables."""
        pinecone.init(
            api_key=os.getenv("PINECONE_API_KEY"),
            environment=os.getenv("PINECONE_ENVIRONMENT")
        )
        
        # Create index if it doesn't exist
        if self.index_name not in pinecone.list_indexes():
            pinecone.create_index(
                self.index_name,
                dimension=1536,  # OpenAI embedding dimension
                metric="cosine",
                pods=1,
                replicas=1,
                pod_type="p1.x1"
            )
        
        self.index = pinecone.Index(self.index_name)
        self.vectorstore = Pinecone(self.index, self.embedding_model, "text")
    
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
    async def add_documents(self, documents: List[Document]) -> List[str]:
        """Add documents with retry logic."""
        try:
            # Batch documents for efficient upload
            batch_size = 100
            all_ids = []
            
            for i in range(0, len(documents), batch_size):
                batch = documents[i:i + batch_size]
                ids = await asyncio.to_thread(
                    self.vectorstore.add_documents, batch
                )
                all_ids.extend(ids)
                
                logger.info("documents_added_to_pinecone", 
                           count=len(batch), 
                           total_processed=i + len(batch))
            
            return all_ids
            
        except Exception as e:
            logger.error("pinecone_add_failed", error=str(e))
            raise
    
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
    async def similarity_search(self, query: str, k: int = 4) -> List[Document]:
        """Search with retry logic."""
        try:
            results = await asyncio.to_thread(
                self.vectorstore.similarity_search, query, k=k
            )
            return results
        except Exception as e:
            logger.error("pinecone_search_failed", error=str(e))
            raise
    
    async def delete(self, ids: List[str]) -> bool:
        """Delete documents by ID."""
        try:
            self.index.delete(ids=ids)
            return True
        except Exception as e:
            logger.error("pinecone_delete_failed", error=str(e))
            return False

class ChromaVectorStore(VectorStoreInterface):
    """Production Chroma integration for local/self-hosted deployments."""
    
    def __init__(self, collection_name: str, embedding_model: OpenAIEmbeddings):
        self.collection_name = collection_name
        self.embedding_model = embedding_model
        self._init_chroma()
    
    def _init_chroma(self):
        """Initialize Chroma client."""
        self.client = chromadb.Client()
        self.collection = self.client.get_or_create_collection(
            name=self.collection_name,
            metadata={"hnsw:space": "cosine"}
        )
        self.vectorstore = Chroma(
            client=self.client,
            collection_name=self.collection_name,
            embedding_function=self.embedding_model
        )
    
    async def add_documents(self, documents: List[Document]) -> List[str]:
        """Add documents to Chroma."""
        ids = await asyncio.to_thread(
            self.vectorstore.add_documents, documents
        )
        return ids
    
    async def similarity_search(self, query: str, k: int = 4) -> List[Document]:
        """Perform similarity search."""
        return await asyncio.to_thread(
            self.vectorstore.similarity_search, query, k=k
        )
    
    async def delete(self, ids: List[str]) -> bool:
        """Delete documents."""
        try:
            self.collection.delete(ids=ids)
            return True
        except Exception as e:
            logger.error("chroma_delete_failed", error=str(e))
            return False

class WeaviateVectorStore(VectorStoreInterface):
    """Production Weaviate integration for enterprise deployments."""
    
    def __init__(self, index_name: str, embedding_model: OpenAIEmbeddings):
        self.index_name = index_name
        self.embedding_model = embedding_model
        self._init_weaviate()
    
    def _init_weaviate(self):
        """Initialize Weaviate client."""
        self.client = weaviate.Client(
            url=os.getenv("WEAVIATE_URL", "http://localhost:8080")
        )
        
        # Create schema if it doesn't exist
        self._create_schema()
        
        self.vectorstore = Weaviate(
            client=self.client,
            index_name=self.index_name,
            text_key="content",
            embedding=self.embedding_model
        )
    
    def _create_schema(self):
        """Create Weaviate schema."""
        schema = {
            "class": self.index_name,
            "vectorizer": "none",
            "properties": [
                {
                    "name": "content",
                    "dataType": ["text"],
                },
                {
                    "name": "metadata",
                    "dataType": ["object"],
                }
            ]
        }
        
        if not self.client.schema.exists(self.index_name):
            self.client.schema.create_class(schema)
    
    async def add_documents(self, documents: List[Document]) -> List[str]:
        """Add documents to Weaviate."""
        return await asyncio.to_thread(
            self.vectorstore.add_documents, documents
        )
    
    async def similarity_search(self, query: str, k: int = 4) -> List[Document]:
        """Perform similarity search."""
        return await asyncio.to_thread(
            self.vectorstore.similarity_search, query, k=k
        )
    
    async def delete(self, ids: List[str]) -> bool:
        """Delete documents."""
        try:
            for id in ids:
                self.client.data_object.delete(id, class_name=self.index_name)
            return True
        except Exception as e:
            logger.error("weaviate_delete_failed", error=str(e))
            return False

3. Caching Layer for Performance

Implement a multi-level caching system to reduce API calls and improve response times:

import redis
import json
from datetime import timedelta
from typing import Optional, Any
import pickle

class CacheManager:
    """Multi-level caching for embeddings and search results."""
    
    def __init__(self, redis_url: str = None):
        self.redis_url = redis_url or os.getenv("REDIS_URL", "redis://localhost:6379")
        self.redis_client = redis.from_url(self.redis_url)
        self.local_cache = {}  # In-memory cache for hot data
        self.local_cache_size = 1000
    
    async def get_embedding(self, text: str) -> Optional[List[float]]:
        """Retrieve cached embedding."""
        cache_key = f"embedding:{hashlib.md5(text.encode()).hexdigest()}"
        
        # Check local cache first
        if cache_key in self.local_cache:
            return self.local_cache[cache_key]
        
        # Check Redis
        try:
            cached = await asyncio.to_thread(self.redis_client.get, cache_key)
            if cached:
                embedding = pickle.loads(cached)
                self._update_local_cache(cache_key, embedding)
                return embedding
        except Exception as e:
            logger.warning("cache_get_failed", key=cache_key, error=str(e))
        
        return None
    
    async def set_embedding(self, text: str, embedding: List[float], ttl: int = 86400):
        """Cache embedding with TTL."""
        cache_key = f"embedding:{hashlib.md5(text.encode()).hexdigest()}"
        
        # Update local cache
        self._update_local_cache(cache_key, embedding)
        
        # Update Redis
        try:
            await asyncio.to_thread(
                self.redis_client.setex,
                cache_key,
                ttl,
                pickle.dumps(embedding)
            )
        except Exception as e:
            logger.warning("cache_set_failed", key=cache_key, error=str(e))
    
    async def get_search_results(self, query: str, k: int) -> Optional[List[Document]]:
        """Retrieve cached search results."""
        cache_key = f"search:{hashlib.md5(f'{query}:{k}'.encode()).hexdigest()}"
        
        try:
            cached = await asyncio.to_thread(self.redis_client.get, cache_key)
            if cached:
                return pickle.loads(cached)
        except Exception as e:
            logger.warning("search_cache_get_failed", key=cache_key, error=str(e))
        
        return None
    
    async def set_search_results(self, query: str, k: int, results: List[Document], ttl: int = 3600):
        """Cache search results with shorter TTL."""
        cache_key = f"search:{hashlib.md5(f'{query}:{k}'.encode()).hexdigest()}"
        
        try:
            await asyncio.to_thread(
                self.redis_client.setex,
                cache_key,
                ttl,
                pickle.dumps(results)
            )
        except Exception as e:
            logger.warning("search_cache_set_failed", key=cache_key, error=str(e))
    
    def _update_local_cache(self, key: str, value: Any):
        """Update local cache with LRU eviction."""
        if len(self.local_cache) >= self.local_cache_size:
            # Remove oldest entry
            oldest_key = next(iter(self.local_cache))
            del self.local_cache[oldest_key]
        
        self.local_cache[key] = value
    
    async def clear_cache(self, pattern: str = "*"):
        """Clear cache entries matching pattern."""
        try:
            keys = await asyncio.to_thread(self.redis_client.keys, pattern)
            if keys:
                await asyncio.to_thread(self.redis_client.delete, *keys)
            
            # Clear local cache
            self.local_cache.clear()
            
            logger.info("cache_cleared", pattern=pattern, keys_deleted=len(keys))
        except Exception as e:
            logger.error("cache_clear_failed", error=str(e))

4. RAG Query Engine with Monitoring

Build a production-ready query engine with comprehensive monitoring:

from prometheus_client import Counter, Histogram, Gauge
import time
from langchain.chains import RetrievalQA
from langchain.llms import OpenAI
from langchain.callbacks import CallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

# Prometheus metrics
query_counter = Counter('rag_queries_total', 'Total number of RAG queries')
query_duration = Histogram('rag_query_duration_seconds', 'Query processing time')
active_queries = Gauge('rag_active_queries', 'Number of active queries')
cache_hits = Counter('rag_cache_hits_total', 'Number of cache hits')
cache_misses = Counter('rag_cache_misses_total', 'Number of cache misses')
vector_search_duration = Histogram('rag_vector_search_duration_seconds', 'Vector search time')

class RAGQueryEngine:
    """Production RAG query engine with monitoring and caching."""
    
    def __init__(
        self,
        vector_store: VectorStoreInterface,
        llm_model: str = "gpt-3.5-turbo",
        cache_manager: Optional[CacheManager] = None,
        streaming: bool = False
    ):
        self.vector_store = vector_store
        self.cache_manager = cache_manager
        self.streaming = streaming
        
        # Initialize LLM
        callback_manager = CallbackManager([StreamingStdOutCallbackHandler()]) if streaming else None
        self.llm = OpenAI(
            model_name=llm_model,
            temperature=0,
            max_tokens=1000,
            callback_manager=callback_manager
        )
        
        # Create retrieval chain
        self.qa_chain = RetrievalQA.from_chain_type(
            llm=self.llm,
            chain_type="stuff",
            retriever=self._create_retriever(),
            return_source_documents=True
        )
    
    def _create_retriever(self):
        """Create retriever with custom parameters."""
        return self.vector_store.vectorstore.as_retriever(
            search_kwargs={"k": 4}
        )
    
    @query_duration.time()
    async def query(
        self,
        question: str,
        context_filter: Optional[Dict[str, Any]] = None,
        k: int = 4
    ) -> Dict[str, Any]:
        """Execute RAG query with monitoring."""
        query_counter.inc()
        active_queries.inc()
        
        try:
            start_time = time.time()
            
            # Check cache for search results
            cached_results = None
            if self.cache_manager:
                cached_results = await self.cache_manager.get_search_results(question, k)
                if cached_results:
                    cache_hits.inc()
                    logger.info("cache_hit", query=question)
                else:
                    cache_misses.inc()
            
            # Perform vector search if not cached
            if not cached_results:
                with vector_search_duration.time():
                    search_results = await self.vector_store.similarity_search(
                        question, k=k
                    )
                
                # Cache results
                if self.cache_manager:
                    await self.cache_manager.set_search_results(
                        question, k, search_results
                    )
            else:
                search_results = cached_results
            
            # Generate response
            response = await asyncio.to_thread(
                self.qa_chain,
                {"query": question}
            )
            
            # Prepare result
            result = {
                "answer": response["result"],
                "source_documents": [
                    {
                        "content": doc.page_content,
                        "metadata": doc.metadata,
                        "relevance_score": getattr(doc, "score", None)
                    }
                    for doc in response.get("source_documents", [])
                ],
                "processing_time": time.time() - start_time,
                "cached": cached_results is not None
            }
            
            logger.info("query_completed", 
                       query=question, 
                       processing_time=result["processing_time"],
                       num_sources=len(result["source_documents"]))
            
            return result
            
        except Exception as e:
            logger.error("query_failed", query=question, error=str(e))
            raise
        finally:
            active_queries.dec()
    
    async def query_with_reranking(
        self,
        question: str,
        k: int = 10,
        rerank_k: int = 4
    ) -> Dict[str, Any]:
        """Query with reranking for improved relevance."""
        # Get more candidates
        search_results = await self.vector_store.similarity_search(question, k=k)
        
        # Rerank using cross-encoder or other method
        reranked_results = await self._rerank_results(question, search_results, rerank_k)
        
        # Generate response with reranked results
        response = await asyncio.to_thread(
            self.qa_chain,
            {
                "query": question,
                "context": "\n\n".join([doc.page_content for doc in reranked_results])
            }
        )
        
        return {
            "answer": response["result"],
            "source_documents": reranked_results,
            "reranked": True
        }
    
    async def _rerank_results(
        self,
        query: str,
        documents: List[Document],
        k: int
    ) -> List[Document]:
        """Rerank documents using cross-encoder or scoring."""
        # Simple scoring based on keyword overlap
        # In production, use a cross-encoder model
        scored_docs = []
        query_tokens = set(query.lower().split())
        
        for doc in documents:
            doc_tokens = set(doc.page_content.lower().split())
            overlap_score = len(query_tokens.intersection(doc_tokens)) / len(query_tokens)
            scored_docs.append((overlap_score, doc))
        
        # Sort by score and return top k
        scored_docs.sort(key=lambda x: x[0], reverse=True)
        return [doc for _, doc in scored_docs[:k]]

5. Production Deployment and Scaling

Let's implement a scalable API service with proper error handling:

from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional, List
import uvicorn
from contextlib import asynccontextmanager
import signal
import sys

class QueryRequest(BaseModel):
    question: str
    k: int = 4
    use_cache: bool = True
    rerank: bool = False

class QueryResponse(BaseModel):
    answer: str
    source_documents: List[Dict[str, Any]]
    processing_time: float
    cached: bool = False

class DocumentIngestionRequest(BaseModel):
    file_paths: List[str]
    batch_size: int = 100

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Manage application lifecycle."""
    # Startup
    logger.info("Starting RAG service")
    
    # Initialize components
    app.state.vector_store = PineconeVectorStore(
        index_name="production-rag",
        embedding_model=OpenAIEmbeddings()
    )
    
    app.state.cache_manager = CacheManager()
    
    app.state.query_engine = RAGQueryEngine(
        vector_store=app.state.vector_store,
        cache_manager=app.state.cache_manager
    )
    
    app.state.document_processor = DocumentProcessor(
        ProcessingConfig(chunk_size=1000, chunk_overlap=200)
    )
    
    yield
    
    # Shutdown
    logger.info("Shutting down RAG service")
    await app.state.cache_manager.clear_cache()

app = FastAPI(lifespan=lifespan)

@app.post("/query", response_model=QueryResponse)
async def query_endpoint(request: QueryRequest):
    """Handle RAG queries with error handling."""
    try:
        if request.rerank:
            result = await app.state.query_engine.query_with_reranking(
                request.question,
                k=request.k * 2,  # Get more candidates for reranking
                rerank_k=request.k
            )
        else:
            result = await app.state.query_engine.query(
                request.question,
                k=request.k
            )
        
        return QueryResponse(**result)
        
    except Exception as e:
        logger.error("query_endpoint_error", error=str(e))
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/ingest")
async def ingest_documents(
    request: DocumentIngestionRequest,
    background_tasks: BackgroundTasks
):
    """Ingest documents asynchronously."""
    try:
        # Add to background tasks for async processing
        background_tasks.add_task(
            process_documents_background,
            request.file_paths,
            request.batch_size
        )
        
        return {
            "status": "accepted",
            "message": f"Processing {len(request.file_paths)} documents in background"
        }
        
    except Exception as e:
        logger.error("ingest_endpoint_error", error=str(e))
        raise HTTPException(status_code=500, detail=str(e))

async def process_documents_background(file_paths: List[str], batch_size: int):
    """Background document processing."""
    try:
        batch_processor = BatchDocumentProcessor(
            app.state.document_processor,
            max_workers=4
        )
        
        # Process in batches
        for i in range(0, len(file_paths), batch_size):
            batch = file_paths[i:i + batch_size]
            chunks = await batch_processor.process_batch(batch)
            
            # Add to vector store
            await app.state.vector_store.add_documents(chunks)
            
            logger.info("batch_processed", 
                       batch_number=i // batch_size + 1,
                       documents_processed=len(batch))
        
    except Exception as e:
        logger.error("background_processing_failed", error=str(e))

@app.get("/health")
async def health_check():
    """Health check endpoint."""
    return {
        "status": "healthy",
        "vector_store": type(app.state.vector_store).__name__,
        "cache_enabled": app.state.cache_manager is not None
    }

@app.get("/metrics")
async def metrics():
    """Prometheus metrics endpoint."""
    from prometheus_client import generate_latest
    return Response(generate_latest(), media_type="text/plain")

# Graceful shutdown handling
def signal_handler(sig, frame):
    logger.info("Received shutdown signal")
    sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

if __name__ == "__main__":
    uvicorn.run(
        "main:app",
        host="0.0.0.0",
        port=8000,
        workers=4,
        loop="uvloop",
        log_config={
            "version": 1,
            "disable_existing_loggers": False,
            "formatters": {
                "default": {
                    "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
                },
            },
        }
    )

Cost Analysis for Different Scales

Understanding the cost implications of your RAG system is crucial for production deployment. Here's a comprehensive breakdown:

Small Scale (< 1M documents, < 1000 queries/day)

def calculate_small_scale_costs():
    """Calculate costs for small-scale deployment."""
    
    # Document processing
    documents = 1_000_000
    avg_tokens_per_doc = 500
    embedding_cost_per_1k_tokens = 0.0001  # OpenAI ada-002
    
    ingestion_cost = (documents * avg_tokens_per_doc / 1000) * embedding_cost_per_1k_tokens
    
    # Vector storage (Pinecone)
    pinecone_pods = 1  # p1.x1
    pinecone_monthly = 70
    
    # Query costs
    queries_per_day = 1000
    avg_tokens_per_query = 2000  # Including context
    gpt35_cost_per_1k_tokens = 0.002
    
    monthly_query_cost = (queries_per_day * 30 * avg_tokens_per_query / 1000) * gpt35_cost_per_1k_tokens
    
    # Redis cache
    redis_monthly = 15  # Small instance
    
    total_monthly = pinecone_monthly + monthly_query_cost + redis_monthly
    
    return {
        "initial_ingestion": f"${ingestion_cost:.2f}",
        "monthly_infrastructure": f"${pinecone_monthly + redis_monthly:.2f}",
        "monthly_queries": f"${monthly_query_cost:.2f}",
        "total_monthly": f"${total_monthly:.2f}"
    }

Medium Scale (10M documents, 10K queries/day)

def calculate_medium_scale_costs():
    """Calculate costs for medium-scale deployment."""
    
    # Use Chroma self-hosted to reduce costs
    documents = 10_000_000
    avg_tokens_per_doc = 500
    
    ingestion_cost = (documents * avg_tokens_per_doc / 1000) * 0.0001
    
    # Self-hosted infrastructure
    compute_monthly = 500  # EC2/GCP instances
    
    # Query costs with caching
    queries_per_day = 10000
    cache_hit_rate = 0.3  # 30% cache hits
    effective_queries = queries_per_day * (1 - cache_hit_rate)
    
    monthly_query_cost = (effective_queries * 30 * 2000 / 1000) * 0.002
    
    # Redis cluster
    redis_monthly = 100
    
    total_monthly = compute_monthly + monthly_query_cost + redis_monthly
    
    return {
        "initial_ingestion": f"${ingestion_cost:.2f}",
        "monthly_infrastructure": f"${compute_monthly + redis_monthly:.2f}",
        "monthly_queries": f"${monthly_query_cost:.2f}",
        "total_monthly": f"${total_monthly:.2f}",
        "savings_from_cache": f"${(queries_per_day * 30 * cache_hit_rate * 2000 / 1000 * 0.002):.2f}"
    }

Enterprise Scale (100M+ documents, 100K+ queries/day)

def calculate_enterprise_scale_costs():
    """Calculate costs for enterprise-scale deployment."""
    
    # Weaviate cluster with custom embeddings
    documents = 100_000_000
    
    # Use open-source embeddings to reduce costs
    ingestion_cost = 0  # Self-hosted embedding model
    
    # Kubernetes cluster
    k8s_monthly = 5000  # Multi-node cluster
    
    # Query costs with optimization
    queries_per_day = 100000
    cache_hit_rate = 0.5
    model_optimization = 0.7  # Using smaller models where possible
    
    effective_queries = queries_per_day * (1 - cache_hit_rate) * model_optimization
    monthly_query_cost = (effective_queries * 30 * 2000 / 1000) * 0.002
    
    # Additional services
    monitoring_monthly = 500  # Datadog/Prometheus
    cdn_monthly = 200  # For cached responses
    
    total_monthly = k8s_monthly + monthly_query_cost + monitoring_monthly + cdn_monthly
    
    return {
        "initial_ingestion": "$0 (self-hosted)",
        "monthly_infrastructure": f"${k8s_monthly + monitoring_monthly + cdn_monthly:.2f}",
        "monthly_queries": f"${monthly_query_cost:.2f}",
        "total_monthly": f"${total_monthly:.2f}",
        "cost_per_query": f"${total_monthly / (queries_per_day * 30):.4f}"
    }

Performance Optimization Techniques

1. Embedding Optimization

class EmbeddingOptimizer:
    """Optimize embedding generation and storage."""
    
    def __init__(self, cache_manager: CacheManager):
        self.cache_manager = cache_manager
        self.batch_size = 100
    
    async def generate_embeddings_batch(
        self,
        texts: List[str],
        embedding_model: OpenAIEmbeddings
    ) -> List[List[float]]:
        """Generate embeddings in batches with caching."""
        embeddings = []
        uncached_texts = []
        uncached_indices = []
        
        # Check cache first
        for i, text in enumerate(texts):
            cached_embedding = await self.cache_manager.get_embedding(text)
            if cached_embedding:
                embeddings.append(cached_embedding)
            else:
                embeddings.append(None)
                uncached_texts.append(text)
                uncached_indices.append(i)
        
        # Generate embeddings for uncached texts
        if uncached_texts:
            new_embeddings = await asyncio.to_thread(
                embedding_model.embed_documents,
                uncached_texts
            )
            
            # Update cache and results
            for idx, embedding in zip(uncached_indices, new_embeddings):
                embeddings[idx] = embedding
                await self.cache_manager.set_embedding(texts[idx], embedding)
        
        return embeddings

2. Query Optimization

class QueryOptimizer:
    """Optimize query processing."""
    
    def __init__(self):
        self.query_cache = {}
        self.max_cache_size = 10000
    
    def preprocess_query(self, query: str) -> str:
        """Normalize and enhance queries."""
        # Remove extra whitespace
        query = " ".join(query.split())
        
        # Expand abbreviations
        abbreviations = {
            "ML": "machine learning",
            "DL": "deep learning",
            "NLP": "natural language processing",
            "CV": "computer vision"
        }
        
        for abbr, full in abbreviations.items():
            query = query.replace(abbr, full)
        
        return query.lower()
    
    def should_use_hybrid_search(self, query: str) -> bool:
        """Determine if hybrid search would be beneficial."""
        # Use hybrid for short queries or specific patterns
        if len(query.split()) < 3:
            return True
        
        # Check for specific query patterns
        patterns = ["how to", "what is", "explain", "define"]
        return any(query.lower().startswith(p) for p in patterns)

Monitoring and Observability

Implement comprehensive monitoring for production environments:

from opentelemetry import trace
from opentelemetry.exporter.prometheus import PrometheusMetricReader
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.exporter.jaeger import JaegerExporter

class RAGMonitoring:
    """Comprehensive monitoring for RAG systems."""
    
    def __init__(self):
        self._setup_tracing()
        self._setup_metrics()
    
    def _setup_tracing(self):
        """Configure distributed tracing."""
        tracer_provider = TracerProvider()
        trace.set_tracer_provider(tracer_provider)
        
        jaeger_exporter = JaegerExporter(
            agent_host_name="localhost",
            agent_port=6831,
        )
        
        tracer_provider.add_span_processor(
            BatchSpanProcessor(jaeger_exporter)
        )
        
        self.tracer = trace.get_tracer(__name__)
    
    def _setup_metrics(self):
        """Configure metrics collection."""
        reader = PrometheusMetricReader()
        provider = MeterProvider(metric_readers=[reader])
        metrics.set_meter_provider(provider)
        
        meter = metrics.get_meter(__name__)
        
        # Create metrics
        self.query_latency = meter.create_histogram(
            name="rag_query_latency",
            description="Query processing latency",
            unit="ms"
        )
        
        self.document_count = meter.create_counter(
            name="rag_documents_processed",
            description="Number of documents processed"
        )
        
        self.error_count = meter.create_counter(
            name="rag_errors",
            description="Number of errors"
        )
    
    def trace_query(self, func):
        """Decorator for tracing queries."""
        async def wrapper(*args, **kwargs):
            with self.tracer.start_as_current_span("rag_query") as span:
                span.set_attribute("query", kwargs.get("question", ""))
                
                try:
                    result = await func(*args, **kwargs)
                    span.set_attribute("success", True)
                    return result
                except Exception as e:
                    span.set_attribute("success", False)
                    span.set_attribute("error", str(e))
                    self.error_count.add(1)
                    raise
        
        return wrapper

Conclusion

Building a production-ready RAG system with LangChain requires careful consideration of scalability, performance, and cost. By implementing proper chunking strategies, leveraging multiple vector databases, adding caching layers, and comprehensive monitoring, you can create a system that handles real-world demands effectively.

Key takeaways:

  • Choose the right vector database based on your scale and requirements
  • Implement robust error handling and retry logic
  • Use caching strategically to reduce costs and improve performance
  • Monitor everything to identify bottlenecks early
  • Plan for scale from the beginning

For those just starting with RAG, check out our beginner's guide to LangChain to understand the fundamentals before diving into production implementations.

Remember, the best RAG system is one that balances performance, cost, and maintainability for your specific use case. Start with a simple implementation and gradually add optimizations based on real-world usage patterns and requirements.