title: "LangChain Memory Systems: Building Conversational AI" description: "Master LangChain memory for building intelligent conversational AI with persistent context. Learn memory types, implementation patterns, and production strategies for chatbot memory management." published: "2024-01-20" tags:
- langchain
- conversational-ai
- chatbot-development
- memory-management
- nlp
- ai-engineering
Building truly conversational AI requires more than just processing individual messages—it demands maintaining context across entire conversations. LangChain's memory systems provide the foundation for creating chatbots that remember user preferences, track conversation history, and deliver personalized experiences. This comprehensive guide explores LangChain memory implementation, from basic conversation tracking to production-scale memory management.
Understanding LangChain Memory Architecture
LangChain memory serves as the cognitive backbone of conversational AI, enabling chatbots to maintain context across multiple interactions. Unlike stateless language models, memory-equipped systems can reference previous exchanges, track user preferences, and build coherent multi-turn conversations.
Core Memory Components
The memory architecture consists of three fundamental elements:
- Memory Storage: Where conversation data persists
- Memory Keys: How memories are indexed and retrieved
- Memory Loading: When and how memories enter the conversation context
from langchain.memory import ConversationBufferMemory
from langchain.llms import OpenAI
from langchain.chains import ConversationChain
# Initialize basic memory system
memory = ConversationBufferMemory()
llm = OpenAI(temperature=0.7)
# Create conversation chain with memory
conversation = ConversationChain(
llm=llm,
memory=memory,
verbose=True # Shows memory in action
)
# Memory automatically tracks conversation
response1 = conversation.predict(input="Hi, I'm Alex. I'm interested in Python programming.")
response2 = conversation.predict(input="What did I just tell you about myself?")
Deep Dive into Memory Types
LangChain provides multiple memory implementations, each optimized for different use cases and conversation patterns.
ConversationBufferMemory: Complete History Tracking
The simplest memory type stores the entire conversation history verbatim. While straightforward, it's best suited for short conversations due to token limitations.
from langchain.memory import ConversationBufferMemory
from langchain.schema import HumanMessage, AIMessage
# Initialize buffer memory
buffer_memory = ConversationBufferMemory(
return_messages=True, # Return as message objects
memory_key="chat_history",
input_key="human_input"
)
# Manually add conversation history
buffer_memory.chat_memory.add_message(HumanMessage(content="I need help with my order #12345"))
buffer_memory.chat_memory.add_message(AIMessage(content="I'll help you with order #12345. What seems to be the issue?"))
# Access conversation history
messages = buffer_memory.chat_memory.messages
print(f"Total messages: {len(messages)}")
# Save and load memory state
memory_dict = buffer_memory.save_context(
{"human_input": "The delivery is delayed"},
{"output": "I apologize for the delay. Let me check the status of order #12345."}
)
ConversationSummaryMemory: Intelligent Compression
Summary memory uses an LLM to progressively summarize conversations, maintaining context while reducing token usage.
from langchain.memory import ConversationSummaryMemory
from langchain.llms import OpenAI
# Initialize summary memory with custom prompt
summary_memory = ConversationSummaryMemory(
llm=OpenAI(temperature=0),
memory_key="chat_summary",
return_messages=True,
summary_prompt="""Progressively summarize the conversation,
keeping key facts about the user and their requests:
Current summary: {summary}
New lines: {new_lines}
Updated summary:"""
)
# Simulate longer conversation
conversation_lines = [
("I'm looking for a laptop for data science work", "I can help you find the perfect laptop for data science."),
("My budget is around $2000", "With a $2000 budget, you have excellent options."),
("I prefer something with at least 32GB RAM", "32GB RAM is ideal for data science workloads."),
("Battery life is important as I travel often", "I'll prioritize models with extended battery life.")
]
for human, ai in conversation_lines:
summary_memory.save_context(
{"input": human},
{"output": ai}
)
# Get compressed summary
print(summary_memory.load_memory_variables({})['chat_summary'])
ConversationBufferWindowMemory: Sliding Window Approach
Window memory maintains only the most recent k interactions, providing a balance between context retention and efficiency.
from langchain.memory import ConversationBufferWindowMemory
# Keep last 5 exchanges
window_memory = ConversationBufferWindowMemory(
k=5, # Number of exchanges to keep
return_messages=True,
memory_key="recent_history"
)
# Simulate conversation exceeding window
for i in range(8):
window_memory.save_context(
{"input": f"Message {i}"},
{"output": f"Response {i}"}
)
# Only last 5 exchanges remain
recent_messages = window_memory.load_memory_variables({})
print(f"Messages in memory: {len(recent_messages['recent_history'])}")
ConversationSummaryBufferMemory: Hybrid Approach
This memory type combines summarization with recent message buffering, offering both compressed history and detailed recent context.
from langchain.memory import ConversationSummaryBufferMemory
# Hybrid memory with token limit
hybrid_memory = ConversationSummaryBufferMemory(
llm=OpenAI(temperature=0),
max_token_limit=200, # Summarize when exceeding this limit
return_messages=True,
memory_key="conversation_memory"
)
# Add conversation that will trigger summarization
long_conversation = [
("I'm planning a trip to Japan in April", "April is a wonderful time to visit Japan!"),
("I want to see cherry blossoms", "Cherry blossom season peaks in early April."),
("What cities should I visit?", "Tokyo, Kyoto, and Osaka are must-visit cities."),
("I have 10 days total", "10 days allows for a comprehensive tour."),
("I'm interested in both culture and technology", "Japan perfectly blends traditional culture with cutting-edge technology.")
]
for human, ai in long_conversation:
hybrid_memory.save_context({"input": human}, {"output": ai})
# Check memory state
memory_vars = hybrid_memory.load_memory_variables({})
print("Memory contains both summary and recent messages")
ConversationKGMemory: Knowledge Graph Memory
Knowledge Graph memory extracts and stores entities and their relationships, enabling sophisticated entity-aware conversations.
from langchain.memory import ConversationKGMemory
from langchain.llms import OpenAI
# Initialize knowledge graph memory
kg_memory = ConversationKGMemory(
llm=OpenAI(temperature=0),
memory_key="knowledge_graph",
return_messages=True
)
# Build knowledge graph through conversation
kg_conversation = [
("I'm Sarah, a software engineer at TechCorp", "Nice to meet you, Sarah!"),
("I work on the mobile app team", "Mobile development is exciting!"),
("My manager is John Davis", "I'll note that John Davis is your manager."),
("We're building an e-commerce platform", "E-commerce platforms require careful architecture.")
]
for human, ai in kg_conversation:
kg_memory.save_context({"input": human}, {"output": ai})
# Extract knowledge graph
kg = kg_memory.kg.get_triples()
print("Extracted relationships:", kg)
Building a Production Customer Support Bot
Let's build a sophisticated customer support bot that demonstrates memory management in practice.
import asyncio
from typing import Dict, List, Optional
from datetime import datetime
from langchain.memory import ConversationSummaryBufferMemory
from langchain.llms import OpenAI
from langchain.chains import ConversationChain
from langchain.prompts import PromptTemplate
from langchain.callbacks import AsyncCallbackHandler
import redis
import json
from uuid import uuid4
class MemoryDebugCallback(AsyncCallbackHandler):
"""Callback for monitoring memory usage and performance"""
async def on_chain_start(self, serialized: Dict, inputs: Dict, **kwargs):
self.start_time = datetime.now()
print(f"[MEMORY_DEBUG] Chain started with input length: {len(str(inputs))}")
async def on_chain_end(self, outputs: Dict, **kwargs):
duration = (datetime.now() - self.start_time).total_seconds()
print(f"[MEMORY_DEBUG] Chain completed in {duration:.2f}s")
class CustomerSupportBot:
"""Production-ready customer support bot with advanced memory management"""
def __init__(self, redis_client: redis.Redis, llm: OpenAI):
self.redis_client = redis_client
self.llm = llm
self.memory_callback = MemoryDebugCallback()
# Custom support prompt
self.prompt = PromptTemplate(
input_variables=["history", "input"],
template="""You are a helpful customer support agent. Use the conversation history to provide personalized assistance.
Conversation History:
{history}
Customer: {input}
Support Agent: """
)
def _get_memory_key(self, user_id: str, conversation_id: str) -> str:
"""Generate unique memory key for user conversation"""
return f"memory:{user_id}:{conversation_id}"
def _create_memory(self, memory_data: Optional[Dict] = None) -> ConversationSummaryBufferMemory:
"""Create memory instance with optional restoration"""
memory = ConversationSummaryBufferMemory(
llm=self.llm,
max_token_limit=500,
return_messages=True,
memory_key="history"
)
if memory_data:
# Restore previous conversation state
memory.moving_summary_buffer = memory_data.get("summary", "")
for message in memory_data.get("messages", []):
if message["type"] == "human":
memory.chat_memory.add_user_message(message["content"])
else:
memory.chat_memory.add_ai_message(message["content"])
return memory
async def load_conversation_memory(self, user_id: str, conversation_id: str) -> ConversationSummaryBufferMemory:
"""Load conversation memory from Redis"""
memory_key = self._get_memory_key(user_id, conversation_id)
try:
memory_data = self.redis_client.get(memory_key)
if memory_data:
return self._create_memory(json.loads(memory_data))
else:
print(f"[MEMORY_DEBUG] Creating new memory for {user_id}")
return self._create_memory()
except Exception as e:
print(f"[MEMORY_ERROR] Failed to load memory: {e}")
return self._create_memory()
async def save_conversation_memory(self, user_id: str, conversation_id: str, memory: ConversationSummaryBufferMemory):
"""Persist conversation memory to Redis"""
memory_key = self._get_memory_key(user_id, conversation_id)
# Extract memory state
memory_data = {
"summary": memory.moving_summary_buffer,
"messages": [
{"type": "human" if i % 2 == 0 else "ai", "content": msg.content}
for i, msg in enumerate(memory.chat_memory.messages)
],
"updated_at": datetime.now().isoformat()
}
try:
# Save with 24-hour expiration
self.redis_client.setex(
memory_key,
86400, # 24 hours in seconds
json.dumps(memory_data)
)
print(f"[MEMORY_DEBUG] Saved memory for {user_id}, size: {len(json.dumps(memory_data))} bytes")
except Exception as e:
print(f"[MEMORY_ERROR] Failed to save memory: {e}")
async def handle_message(self, user_id: str, conversation_id: str, message: str) -> str:
"""Process user message with memory management"""
# Load conversation memory
memory = await self.load_conversation_memory(user_id, conversation_id)
# Create conversation chain
conversation = ConversationChain(
llm=self.llm,
memory=memory,
prompt=self.prompt,
verbose=True,
callbacks=[self.memory_callback]
)
try:
# Generate response
response = await conversation.apredict(input=message)
# Save updated memory
await self.save_conversation_memory(user_id, conversation_id, memory)
# Monitor memory size
memory_size = len(str(memory.load_memory_variables({})))
if memory_size > 3000:
print(f"[MEMORY_WARNING] Large memory size detected: {memory_size} chars")
return response
except Exception as e:
print(f"[MEMORY_ERROR] Failed to process message: {e}")
return "I apologize, but I encountered an error. Please try again."
async def get_conversation_summary(self, user_id: str, conversation_id: str) -> str:
"""Retrieve conversation summary for analytics"""
memory = await self.load_conversation_memory(user_id, conversation_id)
return memory.moving_summary_buffer or "No summary available"
async def clear_conversation(self, user_id: str, conversation_id: str):
"""Clear conversation memory (GDPR compliance)"""
memory_key = self._get_memory_key(user_id, conversation_id)
self.redis_client.delete(memory_key)
print(f"[MEMORY_DEBUG] Cleared memory for {user_id}")
# Initialize support bot
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
llm = OpenAI(temperature=0.7, model="gpt-3.5-turbo")
support_bot = CustomerSupportBot(redis_client, llm)
# Example usage
async def main():
user_id = "user_123"
conversation_id = str(uuid4())
# Simulate support conversation
messages = [
"Hi, I need help with my recent order",
"The order number is #ORD-789456",
"It was supposed to arrive yesterday but hasn't",
"Can you check the shipping status?"
]
for message in messages:
print(f"\nCustomer: {message}")
response = await support_bot.handle_message(user_id, conversation_id, message)
print(f"Support: {response}")
await asyncio.sleep(1) # Simulate real-time conversation
# Get conversation summary
summary = await support_bot.get_conversation_summary(user_id, conversation_id)
print(f"\n[SUMMARY] {summary}")
# Run example
# asyncio.run(main())
Memory Persistence Strategies
Redis-Based Memory Storage
Redis provides fast, scalable memory storage with built-in expiration and clustering support.
import redis
from typing import Optional, Dict
import json
import zlib
from datetime import datetime, timedelta
class RedisMemoryStore:
"""Production Redis memory storage with compression and monitoring"""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_client = redis.from_url(redis_url, decode_responses=True)
self.compression_threshold = 1024 # Compress if larger than 1KB
def _compress_data(self, data: str) -> bytes:
"""Compress large memory data"""
return zlib.compress(data.encode('utf-8'))
def _decompress_data(self, data: bytes) -> str:
"""Decompress memory data"""
return zlib.decompress(data).decode('utf-8')
def save_memory(self, key: str, memory_data: Dict, ttl_hours: int = 24):
"""Save memory with optional compression"""
data_str = json.dumps(memory_data)
# Monitor memory size
size_bytes = len(data_str.encode('utf-8'))
if size_bytes > self.compression_threshold:
# Compress large memories
compressed = self._compress_data(data_str)
self.redis_client.setex(
f"compressed:{key}",
timedelta(hours=ttl_hours),
compressed
)
print(f"[MEMORY_DEBUG] Compressed {size_bytes} bytes to {len(compressed)} bytes")
else:
self.redis_client.setex(
key,
timedelta(hours=ttl_hours),
data_str
)
def load_memory(self, key: str) -> Optional[Dict]:
"""Load memory with automatic decompression"""
# Check compressed version first
compressed_data = self.redis_client.get(f"compressed:{key}")
if compressed_data:
data_str = self._decompress_data(compressed_data.encode('latin-1'))
return json.loads(data_str)
# Check uncompressed version
data_str = self.redis_client.get(key)
if data_str:
return json.loads(data_str)
return None
def get_memory_stats(self, pattern: str = "*") -> Dict:
"""Get memory usage statistics"""
keys = self.redis_client.keys(pattern)
total_size = 0
compressed_count = 0
for key in keys:
if key.startswith("compressed:"):
compressed_count += 1
memory_usage = self.redis_client.memory_usage(key) or 0
total_size += memory_usage
return {
"total_conversations": len(keys),
"compressed_conversations": compressed_count,
"total_memory_mb": total_size / (1024 * 1024),
"average_memory_kb": (total_size / len(keys) / 1024) if keys else 0
}
PostgreSQL Memory Persistence
PostgreSQL offers robust, ACID-compliant memory storage with advanced querying capabilities.
import psycopg2
from psycopg2.extras import Json
from datetime import datetime
from typing import List, Optional, Dict
import json
class PostgreSQLMemoryStore:
"""PostgreSQL-based memory storage with full-text search and analytics"""
def __init__(self, connection_string: str):
self.connection_string = connection_string
self._create_tables()
def _create_tables(self):
"""Create memory storage tables"""
with psycopg2.connect(self.connection_string) as conn:
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS conversation_memories (
id SERIAL PRIMARY KEY,
user_id VARCHAR(255) NOT NULL,
conversation_id VARCHAR(255) NOT NULL,
summary TEXT,
messages JSONB,
metadata JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(user_id, conversation_id)
);
CREATE INDEX IF NOT EXISTS idx_user_conversations
ON conversation_memories(user_id, updated_at DESC);
CREATE INDEX IF NOT EXISTS idx_messages_gin
ON conversation_memories USING gin(messages);
-- Full-text search on summaries
CREATE INDEX IF NOT EXISTS idx_summary_fts
ON conversation_memories USING gin(to_tsvector('english', summary));
""")
def save_memory(self, user_id: str, conversation_id: str,
summary: str, messages: List[Dict], metadata: Dict = None):
"""Save or update conversation memory"""
with psycopg2.connect(self.connection_string) as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO conversation_memories
(user_id, conversation_id, summary, messages, metadata)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (user_id, conversation_id)
DO UPDATE SET
summary = EXCLUDED.summary,
messages = EXCLUDED.messages,
metadata = EXCLUDED.metadata,
updated_at = CURRENT_TIMESTAMP
""", (user_id, conversation_id, summary, Json(messages), Json(metadata or {})))
def load_memory(self, user_id: str, conversation_id: str) -> Optional[Dict]:
"""Load specific conversation memory"""
with psycopg2.connect(self.connection_string) as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT summary, messages, metadata, updated_at
FROM conversation_memories
WHERE user_id = %s AND conversation_id = %s
""", (user_id, conversation_id))
row = cur.fetchone()
if row:
return {
"summary": row[0],
"messages": row[1],
"metadata": row[2],
"updated_at": row[3].isoformat()
}
return None
def search_memories(self, user_id: str, query: str, limit: int = 10) -> List[Dict]:
"""Full-text search across user's conversation summaries"""
with psycopg2.connect(self.connection_string) as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT conversation_id, summary,
ts_rank(to_tsvector('english', summary),
plainto_tsquery('english', %s)) as rank
FROM conversation_memories
WHERE user_id = %s
AND to_tsvector('english', summary) @@ plainto_tsquery('english', %s)
ORDER BY rank DESC
LIMIT %s
""", (query, user_id, query, limit))
return [
{"conversation_id": row[0], "summary": row[1], "relevance": row[2]}
for row in cur.fetchall()
]
def get_user_analytics(self, user_id: str) -> Dict:
"""Get conversation analytics for a user"""
with psycopg2.connect(self.connection_string) as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT
COUNT(*) as total_conversations,
AVG(jsonb_array_length(messages)) as avg_messages_per_conversation,
MAX(updated_at) as last_interaction,
SUM(jsonb_array_length(messages)) as total_messages
FROM conversation_memories
WHERE user_id = %s
""", (user_id,))
row = cur.fetchone()
return {
"total_conversations": row[0],
"avg_messages_per_conversation": float(row[1] or 0),
"last_interaction": row[2].isoformat() if row[2] else None,
"total_messages": row[3] or 0
}
Memory Optimization for Long Conversations
Dynamic Memory Compression
Implement intelligent compression strategies that adapt based on conversation length and complexity.
from typing import List, Tuple, Dict
from langchain.schema import BaseMessage, HumanMessage, AIMessage
from langchain.llms import OpenAI
import tiktoken
class DynamicMemoryOptimizer:
"""Optimize memory for long conversations with adaptive strategies"""
def __init__(self, llm: OpenAI, max_tokens: int = 2000):
self.llm = llm
self.max_tokens = max_tokens
self.encoder = tiktoken.encoding_for_model("gpt-3.5-turbo")
def count_tokens(self, text: str) -> int:
"""Count tokens in text"""
return len(self.encoder.encode(text))
def segment_conversation(self, messages: List[BaseMessage]) -> List[List[BaseMessage]]:
"""Segment conversation into logical chunks"""
segments = []
current_segment = []
current_topic = None
for i, message in enumerate(messages):
# Simple topic detection based on keywords
if isinstance(message, HumanMessage):
# Detect topic changes
keywords = ["new", "different", "another", "change", "switch"]
if any(keyword in message.content.lower() for keyword in keywords):
if current_segment:
segments.append(current_segment)
current_segment = []
current_segment.append(message)
# Force segment after 10 messages
if len(current_segment) >= 10:
segments.append(current_segment)
current_segment = []
if current_segment:
segments.append(current_segment)
return segments
def summarize_segment(self, messages: List[BaseMessage]) -> str:
"""Summarize a conversation segment"""
conversation_text = "\n".join([
f"{'Human' if isinstance(msg, HumanMessage) else 'AI'}: {msg.content}"
for msg in messages
])
prompt = f"""Summarize this conversation segment, keeping key facts and context:
{conversation_text}
Summary:"""
return self.llm.predict(prompt)
def optimize_memory(self, messages: List[BaseMessage]) -> Tuple[str, List[BaseMessage]]:
"""Optimize memory by summarizing old segments and keeping recent messages"""
total_tokens = sum(self.count_tokens(msg.content) for msg in messages)
if total_tokens <= self.max_tokens:
# No optimization needed
return "", messages
# Segment conversation
segments = self.segment_conversation(messages)
# Keep last segment as-is
recent_messages = segments[-1] if segments else []
# Summarize older segments
summaries = []
for segment in segments[:-1]:
summary = self.summarize_segment(segment)
summaries.append(summary)
combined_summary = "\n\n".join(summaries)
# Check if optimization is sufficient
summary_tokens = self.count_tokens(combined_summary)
recent_tokens = sum(self.count_tokens(msg.content) for msg in recent_messages)
if summary_tokens + recent_tokens > self.max_tokens:
# Further compress summaries
compression_prompt = f"""Compress these summaries while keeping essential information:
{combined_summary}
Compressed summary:"""
combined_summary = self.llm.predict(compression_prompt)
return combined_summary, recent_messages
def get_optimization_metrics(self, original_messages: List[BaseMessage],
optimized_summary: str,
retained_messages: List[BaseMessage]) -> Dict:
"""Calculate optimization metrics"""
original_tokens = sum(self.count_tokens(msg.content) for msg in original_messages)
optimized_tokens = self.count_tokens(optimized_summary) + \
sum(self.count_tokens(msg.content) for msg in retained_messages)
return {
"original_tokens": original_tokens,
"optimized_tokens": optimized_tokens,
"compression_ratio": 1 - (optimized_tokens / original_tokens),
"messages_summarized": len(original_messages) - len(retained_messages),
"summary_length": len(optimized_summary)
}
Multi-User Memory Management
Concurrent Memory Access
Handle multiple users with isolated memory contexts and efficient resource management.
import asyncio
from asyncio import Lock
from typing import Dict, Optional
from collections import OrderedDict
import time
class MultiUserMemoryManager:
"""Manage memory for multiple concurrent users with isolation and efficiency"""
def __init__(self, max_active_memories: int = 100, memory_ttl_seconds: int = 3600):
self.max_active_memories = max_active_memories
self.memory_ttl_seconds = memory_ttl_seconds
self.active_memories: OrderedDict[str, Dict] = OrderedDict()
self.memory_locks: Dict[str, Lock] = {}
self.access_times: Dict[str, float] = {}
self.global_lock = Lock()
async def get_user_lock(self, user_key: str) -> Lock:
"""Get or create a lock for a specific user"""
async with self.global_lock:
if user_key not in self.memory_locks:
self.memory_locks[user_key] = Lock()
return self.memory_locks[user_key]
async def evict_old_memories(self):
"""Evict least recently used memories"""
async with self.global_lock:
current_time = time.time()
# Remove expired memories
expired_keys = [
key for key, access_time in self.access_times.items()
if current_time - access_time > self.memory_ttl_seconds
]
for key in expired_keys:
self.active_memories.pop(key, None)
self.access_times.pop(key, None)
self.memory_locks.pop(key, None)
# LRU eviction if over capacity
while len(self.active_memories) > self.max_active_memories:
oldest_key = next(iter(self.active_memories))
self.active_memories.pop(oldest_key)
self.access_times.pop(oldest_key, None)
self.memory_locks.pop(oldest_key, None)
async def get_user_memory(self, user_id: str, conversation_id: str,
memory_store: 'RedisMemoryStore') -> Optional[Dict]:
"""Get user memory with caching and isolation"""
user_key = f"{user_id}:{conversation_id}"
user_lock = await self.get_user_lock(user_key)
async with user_lock:
# Check active memories
if user_key in self.active_memories:
self.access_times[user_key] = time.time()
# Move to end (most recently used)
self.active_memories.move_to_end(user_key)
return self.active_memories[user_key]
# Load from persistent storage
memory_data = memory_store.load_memory(user_key)
if memory_data:
await self.evict_old_memories()
self.active_memories[user_key] = memory_data
self.access_times[user_key] = time.time()
return memory_data
return None
async def update_user_memory(self, user_id: str, conversation_id: str,
memory_data: Dict, memory_store: 'RedisMemoryStore'):
"""Update user memory with write-through caching"""
user_key = f"{user_id}:{conversation_id}"
user_lock = await self.get_user_lock(user_key)
async with user_lock:
# Update active memory
self.active_memories[user_key] = memory_data
self.access_times[user_key] = time.time()
self.active_memories.move_to_end(user_key)
# Persist to storage
memory_store.save_memory(user_key, memory_data)
def get_memory_stats(self) -> Dict:
"""Get memory manager statistics"""
return {
"active_memories": len(self.active_memories),
"total_locks": len(self.memory_locks),
"max_capacity": self.max_active_memories,
"utilization": len(self.active_memories) / self.max_active_memories
}
# Usage example
memory_manager = MultiUserMemoryManager(max_active_memories=100)
redis_store = RedisMemoryStore()
async def handle_concurrent_users():
"""Simulate concurrent user access"""
async def user_session(user_id: str):
conversation_id = "conv_001"
# Get memory
memory = await memory_manager.get_user_memory(
user_id, conversation_id, redis_store
)
# Update memory
new_memory = {"messages": ["Hello", "How can I help?"], "summary": "Greeting"}
await memory_manager.update_user_memory(
user_id, conversation_id, new_memory, redis_store
)
# Simulate 50 concurrent users
tasks = [user_session(f"user_{i}") for i in range(50)]
await asyncio.gather(*tasks)
print("Memory stats:", memory_manager.get_memory_stats())
Privacy Considerations
Privacy-Preserving Memory Management
Implement privacy features to protect user data and ensure compliance with regulations.
import hashlib
from cryptography.fernet import Fernet
from typing import Dict, List
import json
from datetime import datetime, timedelta
class PrivacyAwareMemoryManager:
"""Memory manager with privacy protection and compliance features"""
def __init__(self, encryption_key: bytes = None):
self.encryption_key = encryption_key or Fernet.generate_key()
self.cipher = Fernet(self.encryption_key)
self.pii_patterns = [
r'\b\d{3}-\d{2}-\d{4}\b', # SSN
r'\b(?:\d{4}[-\s]?){3}\d{4}\b', # Credit card
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', # Email
r'\b\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b', # Phone
]
def anonymize_user_id(self, user_id: str) -> str:
"""Create anonymized user identifier"""
return hashlib.sha256(user_id.encode()).hexdigest()[:16]
def encrypt_memory(self, memory_data: Dict) -> str:
"""Encrypt sensitive memory data"""
json_data = json.dumps(memory_data)
encrypted = self.cipher.encrypt(json_data.encode())
return encrypted.decode('latin-1')
def decrypt_memory(self, encrypted_data: str) -> Dict:
"""Decrypt memory data"""
decrypted = self.cipher.decrypt(encrypted_data.encode('latin-1'))
return json.loads(decrypted.decode())
def redact_pii(self, text: str) -> str:
"""Redact personally identifiable information"""
import re
redacted = text
for pattern in self.pii_patterns:
redacted = re.sub(pattern, '[REDACTED]', redacted)
return redacted
def create_audit_log(self, user_id: str, action: str, metadata: Dict = None):
"""Create audit log for compliance"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"anonymized_user": self.anonymize_user_id(user_id),
"action": action,
"metadata": metadata or {}
}
# In production, write to secure audit log storage
print(f"[AUDIT] {json.dumps(log_entry)}")
def apply_retention_policy(self, memory_data: Dict, retention_days: int = 90) -> Dict:
"""Apply data retention policy"""
if "created_at" in memory_data:
created = datetime.fromisoformat(memory_data["created_at"])
if datetime.now() - created > timedelta(days=retention_days):
# Archive or delete based on policy
return {"archived": True, "archived_at": datetime.now().isoformat()}
return memory_data
def get_user_data_export(self, user_id: str, memories: List[Dict]) -> Dict:
"""Export user data for GDPR compliance"""
return {
"user_id": self.anonymize_user_id(user_id),
"export_date": datetime.now().isoformat(),
"data": [
{
"conversation_id": mem.get("conversation_id"),
"summary": self.redact_pii(mem.get("summary", "")),
"message_count": len(mem.get("messages", [])),
"created_at": mem.get("created_at")
}
for mem in memories
]
}
def delete_user_data(self, user_id: str, memory_store):
"""Complete user data deletion for right to be forgotten"""
anonymized_id = self.anonymize_user_id(user_id)
# Delete from all storage systems
# This is a simplified example - implement for your storage
self.create_audit_log(user_id, "user_data_deleted", {
"reason": "user_request",
"deletion_time": datetime.now().isoformat()
})
return {"status": "deleted", "user": anonymized_id}
# Example usage with privacy protection
privacy_manager = PrivacyAwareMemoryManager()
# Process conversation with PII protection
conversation_with_pii = {
"messages": [
{"role": "user", "content": "My email is [email protected]"},
{"role": "assistant", "content": "I'll help you with your account"},
{"role": "user", "content": "My phone is 555-123-4567"}
],
"summary": "User [email protected] needs account help"
}
# Redact PII before storage
safe_conversation = {
"messages": [
{"role": msg["role"], "content": privacy_manager.redact_pii(msg["content"])}
for msg in conversation_with_pii["messages"]
],
"summary": privacy_manager.redact_pii(conversation_with_pii["summary"])
}
# Encrypt for storage
encrypted = privacy_manager.encrypt_memory(safe_conversation)
print(f"Encrypted length: {len(encrypted)} chars")
# Decrypt when needed
decrypted = privacy_manager.decrypt_memory(encrypted)
print(f"Decrypted successfully: {len(decrypted['messages'])} messages")
Memory Debugging and Monitoring
Comprehensive Memory Monitoring System
Track memory performance, usage patterns, and potential issues in production.
import logging
from datetime import datetime
from typing import Dict, List, Optional
import psutil
import tracemalloc
from dataclasses import dataclass
from collections import deque
import asyncio
@dataclass
class MemoryMetrics:
"""Container for memory metrics"""
timestamp: datetime
conversation_id: str
token_count: int
processing_time_ms: float
memory_type: str
cache_hit: bool
error: Optional[str] = None
class MemoryMonitor:
"""Monitor and debug memory system performance"""
def __init__(self, max_metrics: int = 1000):
self.metrics: deque[MemoryMetrics] = deque(maxlen=max_metrics)
self.logger = logging.getLogger("memory_monitor")
self.logger.setLevel(logging.INFO)
# Setup logging
handler = logging.StreamHandler()
formatter = logging.Formatter(
'[%(asctime)s] %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
# Start memory tracking
tracemalloc.start()
def log_memory_operation(self, metrics: MemoryMetrics):
"""Log memory operation metrics"""
self.metrics.append(metrics)
if metrics.error:
self.logger.error(
f"Memory error for {metrics.conversation_id}: {metrics.error}"
)
elif metrics.processing_time_ms > 1000:
self.logger.warning(
f"Slow memory operation: {metrics.processing_time_ms:.2f}ms for {metrics.conversation_id}"
)
# Check system memory
memory_percent = psutil.virtual_memory().percent
if memory_percent > 80:
self.logger.warning(f"High system memory usage: {memory_percent}%")
def get_performance_stats(self) -> Dict:
"""Calculate performance statistics"""
if not self.metrics:
return {}
processing_times = [m.processing_time_ms for m in self.metrics if not m.error]
token_counts = [m.token_count for m in self.metrics]
cache_hits = sum(1 for m in self.metrics if m.cache_hit)
return {
"total_operations": len(self.metrics),
"error_rate": sum(1 for m in self.metrics if m.error) / len(self.metrics),
"avg_processing_time_ms": sum(processing_times) / len(processing_times) if processing_times else 0,
"max_processing_time_ms": max(processing_times) if processing_times else 0,
"avg_token_count": sum(token_counts) / len(token_counts) if token_counts else 0,
"cache_hit_rate": cache_hits / len(self.metrics) if self.metrics else 0,
"memory_types": self._count_memory_types()
}
def _count_memory_types(self) -> Dict[str, int]:
"""Count usage by memory type"""
type_counts = {}
for metric in self.metrics:
type_counts[metric.memory_type] = type_counts.get(metric.memory_type, 0) + 1
return type_counts
def get_memory_leaks(self) -> List[tuple]:
"""Detect potential memory leaks"""
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
# Filter for langchain-related allocations
memory_leaks = []
for stat in top_stats[:10]:
if 'langchain' in stat.traceback.format()[0]:
memory_leaks.append((
stat.traceback.format()[0],
stat.size / 1024 / 1024, # Convert to MB
stat.count
))
return memory_leaks
def create_memory_report(self) -> str:
"""Generate comprehensive memory report"""
stats = self.get_performance_stats()
leaks = self.get_memory_leaks()
system_memory = psutil.virtual_memory()
report = f"""
Memory System Report - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
{'=' * 60}
Performance Statistics:
- Total Operations: {stats.get('total_operations', 0)}
- Error Rate: {stats.get('error_rate', 0):.2%}
- Avg Processing Time: {stats.get('avg_processing_time_ms', 0):.2f}ms
- Max Processing Time: {stats.get('max_processing_time_ms', 0):.2f}ms
- Cache Hit Rate: {stats.get('cache_hit_rate', 0):.2%}
Memory Type Usage:
"""
for mem_type, count in stats.get('memory_types', {}).items():
report += f"- {mem_type}: {count} operations\n"
report += f"""
System Memory:
- Total: {system_memory.total / 1024 / 1024 / 1024:.2f} GB
- Used: {system_memory.percent}%
- Available: {system_memory.available / 1024 / 1024 / 1024:.2f} GB
Potential Memory Leaks:
"""
for location, size_mb, count in leaks[:5]:
report += f"- {location}: {size_mb:.2f} MB in {count} blocks\n"
return report
# Example monitoring integration
monitor = MemoryMonitor()
async def monitored_memory_operation(conversation_id: str, memory_type: str,
operation_func, *args, **kwargs):
"""Wrapper for monitoring memory operations"""
start_time = datetime.now()
error = None
result = None
try:
result = await operation_func(*args, **kwargs)
cache_hit = kwargs.get('cache_hit', False)
except Exception as e:
error = str(e)
cache_hit = False
raise
finally:
processing_time = (datetime.now() - start_time).total_seconds() * 1000
metrics = MemoryMetrics(
timestamp=datetime.now(),
conversation_id=conversation_id,
token_count=kwargs.get('token_count', 0),
processing_time_ms=processing_time,
memory_type=memory_type,
cache_hit=cache_hit,
error=error
)
monitor.log_memory_operation(metrics)
return result
# Generate periodic reports
async def periodic_memory_reports(interval_minutes: int = 30):
"""Generate memory reports at regular intervals"""
while True:
await asyncio.sleep(interval_minutes * 60)
report = monitor.create_memory_report()
monitor.logger.info(f"Periodic Memory Report:\n{report}")
Production Patterns and Best Practices
Memory System Architecture
Design robust memory systems that scale with your application.
from abc import ABC, abstractmethod
from typing import Protocol, runtime_checkable
import asyncio
@runtime_checkable
class MemoryBackend(Protocol):
"""Protocol for memory backend implementations"""
async def save(self, key: str, data: Dict) -> bool: ...
async def load(self, key: str) -> Optional[Dict]: ...
async def delete(self, key: str) -> bool: ...
async def exists(self, key: str) -> bool: ...
class MemoryStrategy(ABC):
"""Abstract base for memory strategies"""
@abstractmethod
async def should_summarize(self, token_count: int, message_count: int) -> bool:
pass
@abstractmethod
async def get_retention_period(self, user_type: str) -> int:
pass
class ProductionMemorySystem:
"""Production-ready memory system with multiple backends and strategies"""
def __init__(self,
primary_backend: MemoryBackend,
fallback_backend: Optional[MemoryBackend] = None,
strategy: Optional[MemoryStrategy] = None):
self.primary_backend = primary_backend
self.fallback_backend = fallback_backend
self.strategy = strategy or DefaultMemoryStrategy()
self.health_check_interval = 60 # seconds
self._start_health_checks()
def _start_health_checks(self):
"""Start background health checks"""
asyncio.create_task(self._health_check_loop())
async def _health_check_loop(self):
"""Continuous health monitoring"""
while True:
await asyncio.sleep(self.health_check_interval)
# Check primary backend
if not await self._check_backend_health(self.primary_backend):
logging.error("Primary memory backend unhealthy")
# Check fallback if exists
if self.fallback_backend and not await self._check_backend_health(self.fallback_backend):
logging.error("Fallback memory backend unhealthy")
async def _check_backend_health(self, backend: MemoryBackend) -> bool:
"""Check if backend is healthy"""
try:
test_key = "__health_check__"
test_data = {"timestamp": datetime.now().isoformat()}
# Test write
if not await backend.save(test_key, test_data):
return False
# Test read
loaded = await backend.load(test_key)
if not loaded:
return False
# Test delete
await backend.delete(test_key)
return True
except Exception as e:
logging.error(f"Backend health check failed: {e}")
return False
async def save_memory(self, key: str, memory_data: Dict) -> bool:
"""Save memory with fallback support"""
try:
# Try primary backend
if await self.primary_backend.save(key, memory_data):
# Async replicate to fallback if available
if self.fallback_backend:
asyncio.create_task(
self.fallback_backend.save(key, memory_data)
)
return True
except Exception as e:
logging.error(f"Primary backend save failed: {e}")
# Try fallback
if self.fallback_backend:
try:
return await self.fallback_backend.save(key, memory_data)
except Exception as e2:
logging.error(f"Fallback backend save failed: {e2}")
return False
async def load_memory(self, key: str) -> Optional[Dict]:
"""Load memory with fallback support"""
try:
# Try primary backend
data = await self.primary_backend.load(key)
if data:
return data
except Exception as e:
logging.error(f"Primary backend load failed: {e}")
# Try fallback
if self.fallback_backend:
try:
return await self.fallback_backend.load(key)
except Exception as e2:
logging.error(f"Fallback backend load failed: {e2}")
return None
class DefaultMemoryStrategy(MemoryStrategy):
"""Default production memory strategy"""
async def should_summarize(self, token_count: int, message_count: int) -> bool:
# Summarize if over 1000 tokens or 20 messages
return token_count > 1000 or message_count > 20
async def get_retention_period(self, user_type: str) -> int:
# Different retention for different user types
retention_days = {
"premium": 365,
"standard": 90,
"trial": 30
}
return retention_days.get(user_type, 30)
Integration with Existing Systems
Connect your LangChain memory system with your existing chatbot infrastructure. For a complete chatbot implementation guide, see our comprehensive chatbot development tutorial.
Memory Migration Tools
class MemoryMigrator:
"""Migrate memories between different storage systems"""
async def migrate_memories(self,
source: MemoryBackend,
destination: MemoryBackend,
user_filter: Optional[str] = None) -> Dict:
"""Migrate memories with progress tracking"""
migrated = 0
failed = 0
# Implementation depends on backend capabilities
# This is a simplified example
keys = await source.list_keys(pattern=user_filter or "*")
for key in keys:
try:
data = await source.load(key)
if data and await destination.save(key, data):
migrated += 1
else:
failed += 1
except Exception as e:
logging.error(f"Failed to migrate {key}: {e}")
failed += 1
return {
"total": len(keys),
"migrated": migrated,
"failed": failed
}
Conclusion
LangChain memory systems transform stateless language models into intelligent conversational agents capable of maintaining context, personalizing interactions, and delivering coherent multi-turn experiences. By implementing the patterns and strategies outlined in this guide—from basic memory types to production-scale optimization—you can build conversational AI that truly understands and remembers your users.
The key to successful memory implementation lies in choosing the right memory type for your use case, implementing robust persistence strategies, and continuously monitoring performance. Whether you're building a customer support bot or a complex conversational interface, LangChain's memory capabilities provide the foundation for creating AI that feels genuinely conversational.
For next steps, explore our chatbot development tutorial to see these memory concepts applied in a complete application, or dive deeper into specific memory backends for your production needs.