title: "Real-Time AI Applications with LangChain and WebSockets" description: "Learn how to build scalable real-time AI applications using LangChain streaming, WebSocket integration, and Server-Sent Events for responsive user experiences" date: "2024-01-15" author: "Fenil Sonani" tags: ["langchain", "websockets", "real-time", "ai", "streaming"] keywords: ["langchain streaming", "real-time ai", "langchain websocket", "server-sent events", "langchain real-time", "websocket ai integration", "streaming llm responses"]
Building real-time AI applications has become crucial for creating responsive and engaging user experiences. This comprehensive guide explores how to implement LangChain streaming with WebSockets and Server-Sent Events (SSE) to build a scalable real-time AI assistant. We'll cover everything from basic streaming responses to advanced topics like connection management, queue handling, and production-ready scaling considerations.
Why Real-Time AI Matters
Traditional request-response patterns for AI applications can lead to poor user experiences, especially when dealing with large language models that take seconds to generate complete responses. Real-time streaming solves this by:
- Immediate feedback: Users see responses as they're generated
- Better perceived performance: Even slower models feel responsive
- Enhanced interactivity: Enable features like real-time collaboration
- Resource efficiency: Stream processing reduces memory overhead
Understanding Streaming Technologies
Before diving into implementation, let's understand the three main approaches for real-time communication in web applications:
WebSockets
WebSockets provide full-duplex communication channels over a single TCP connection, perfect for bidirectional real-time data flow.
Advantages:
- Bidirectional communication
- Low latency
- Efficient for frequent small messages
- Persistent connections
Use cases:
- Collaborative features
- Real-time notifications
- Interactive AI assistants
Server-Sent Events (SSE)
SSE enables servers to push data to web clients over HTTP, ideal for unidirectional streaming from server to client.
Advantages:
- Simple implementation
- Automatic reconnection
- Works over standard HTTP
- Built-in event IDs for message ordering
Use cases:
- Streaming AI responses
- Live updates
- Progress notifications
Long Polling
While not true real-time, long polling can be a fallback option for environments where WebSockets and SSE aren't available.
Setting Up the Backend Infrastructure
Let's build a complete real-time AI assistant using LangChain, WebSockets, and proper connection management. We'll use Node.js with Express for the backend and React for the frontend.
Installing Dependencies
# Backend dependencies
npm install express ws langchain @langchain/openai dotenv cors
npm install --save-dev @types/ws @types/express typescript nodemon
# Additional utilities
npm install uuid winston bull redis
Basic WebSocket Server with LangChain
Here's our foundational WebSocket server with LangChain integration:
// src/server.ts
import express from 'express';
import { createServer } from 'http';
import WebSocket from 'ws';
import { ChatOpenAI } from '@langchain/openai';
import { StringOutputParser } from '@langchain/core/output_parsers';
import { ChatPromptTemplate } from '@langchain/core/prompts';
import dotenv from 'dotenv';
import { v4 as uuidv4 } from 'uuid';
dotenv.config();
const app = express();
const server = createServer(app);
const wss = new WebSocket.Server({ server });
// Initialize LangChain with streaming
const model = new ChatOpenAI({
modelName: 'gpt-3.5-turbo',
temperature: 0.7,
streaming: true,
openAIApiKey: process.env.OPENAI_API_KEY,
});
const outputParser = new StringOutputParser();
// Connection tracking
const connections = new Map<string, WebSocket>();
wss.on('connection', (ws: WebSocket) => {
const connectionId = uuidv4();
connections.set(connectionId, ws);
console.log(`New connection: ${connectionId}`);
// Send connection established message
ws.send(JSON.stringify({
type: 'connection',
connectionId,
status: 'connected'
}));
ws.on('message', async (message: string) => {
try {
const data = JSON.parse(message.toString());
if (data.type === 'chat') {
await handleChatMessage(ws, data);
} else if (data.type === 'ping') {
ws.send(JSON.stringify({ type: 'pong' }));
}
} catch (error) {
console.error('Error processing message:', error);
ws.send(JSON.stringify({
type: 'error',
error: 'Failed to process message'
}));
}
});
ws.on('close', () => {
connections.delete(connectionId);
console.log(`Connection closed: ${connectionId}`);
});
ws.on('error', (error) => {
console.error(`WebSocket error for ${connectionId}:`, error);
});
});
async function handleChatMessage(ws: WebSocket, data: any) {
const { message, conversationId } = data;
try {
// Create prompt template
const prompt = ChatPromptTemplate.fromTemplate(
"You are a helpful AI assistant. Respond to the following: {input}"
);
// Create streaming chain
const chain = prompt.pipe(model).pipe(outputParser);
// Send start of stream
ws.send(JSON.stringify({
type: 'stream_start',
conversationId,
timestamp: new Date().toISOString()
}));
// Stream the response
const stream = await chain.stream({
input: message
});
for await (const chunk of stream) {
ws.send(JSON.stringify({
type: 'stream_chunk',
conversationId,
content: chunk
}));
}
// Send end of stream
ws.send(JSON.stringify({
type: 'stream_end',
conversationId,
timestamp: new Date().toISOString()
}));
} catch (error) {
console.error('Error in chat handler:', error);
ws.send(JSON.stringify({
type: 'error',
conversationId,
error: 'Failed to generate response'
}));
}
}
server.listen(3001, () => {
console.log('WebSocket server running on port 3001');
});
Implementing Server-Sent Events Alternative
For scenarios where WebSockets aren't suitable, here's an SSE implementation:
// src/sse-handler.ts
import { Response } from 'express';
import { ChatOpenAI } from '@langchain/openai';
import { StringOutputParser } from '@langchain/core/output_parsers';
import { ChatPromptTemplate } from '@langchain/core/prompts';
export async function handleSSEChat(req: any, res: Response) {
// Set headers for SSE
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('Access-Control-Allow-Origin', '*');
// Send initial connection event
res.write(`event: connected\ndata: ${JSON.stringify({ status: 'connected' })}\n\n`);
const { message } = req.body;
try {
const model = new ChatOpenAI({
modelName: 'gpt-3.5-turbo',
temperature: 0.7,
streaming: true,
});
const prompt = ChatPromptTemplate.fromTemplate(
"You are a helpful AI assistant. Respond to: {input}"
);
const chain = prompt.pipe(model).pipe(new StringOutputParser());
// Send stream start event
res.write(`event: stream_start\ndata: ${JSON.stringify({ timestamp: new Date().toISOString() })}\n\n`);
const stream = await chain.stream({ input: message });
for await (const chunk of stream) {
// Send each chunk as an SSE event
res.write(`event: stream_chunk\ndata: ${JSON.stringify({ content: chunk })}\n\n`);
}
// Send stream end event
res.write(`event: stream_end\ndata: ${JSON.stringify({ timestamp: new Date().toISOString() })}\n\n`);
res.end();
} catch (error) {
res.write(`event: error\ndata: ${JSON.stringify({ error: 'Failed to generate response' })}\n\n`);
res.end();
}
}
// Add route to Express app
app.post('/api/chat/sse', express.json(), handleSSEChat);
Advanced Connection Management
Robust connection management is crucial for production applications. Let's implement reconnection logic, heartbeat monitoring, and connection pooling:
// src/connection-manager.ts
import WebSocket from 'ws';
import { EventEmitter } from 'events';
import winston from 'winston';
interface ConnectionInfo {
id: string;
ws: WebSocket;
userId?: string;
lastActivity: Date;
reconnectAttempts: number;
}
export class ConnectionManager extends EventEmitter {
private connections: Map<string, ConnectionInfo> = new Map();
private heartbeatInterval: NodeJS.Timer;
private logger: winston.Logger;
constructor() {
super();
this.logger = winston.createLogger({
level: 'info',
format: winston.format.json(),
transports: [
new winston.transports.File({ filename: 'connections.log' }),
new winston.transports.Console()
]
});
// Start heartbeat monitoring
this.heartbeatInterval = setInterval(() => {
this.checkHeartbeats();
}, 30000); // Check every 30 seconds
}
addConnection(id: string, ws: WebSocket, userId?: string): void {
const connectionInfo: ConnectionInfo = {
id,
ws,
userId,
lastActivity: new Date(),
reconnectAttempts: 0
};
this.connections.set(id, connectionInfo);
// Setup ping/pong for connection health
ws.on('pong', () => {
const conn = this.connections.get(id);
if (conn) {
conn.lastActivity = new Date();
}
});
this.logger.info('Connection added', { id, userId });
this.emit('connection:added', connectionInfo);
}
removeConnection(id: string): void {
const conn = this.connections.get(id);
if (conn) {
this.connections.delete(id);
this.logger.info('Connection removed', { id, userId: conn.userId });
this.emit('connection:removed', conn);
}
}
private checkHeartbeats(): void {
const now = new Date();
const timeout = 60000; // 60 seconds timeout
this.connections.forEach((conn, id) => {
const timeSinceLastActivity = now.getTime() - conn.lastActivity.getTime();
if (timeSinceLastActivity > timeout) {
if (conn.ws.readyState === WebSocket.OPEN) {
// Try to ping the connection
conn.ws.ping();
} else {
// Connection is dead, remove it
this.removeConnection(id);
}
}
});
}
getConnectionsByUserId(userId: string): ConnectionInfo[] {
return Array.from(this.connections.values())
.filter(conn => conn.userId === userId);
}
broadcastToUser(userId: string, message: any): void {
const userConnections = this.getConnectionsByUserId(userId);
const messageStr = JSON.stringify(message);
userConnections.forEach(conn => {
if (conn.ws.readyState === WebSocket.OPEN) {
conn.ws.send(messageStr);
}
});
}
getMetrics() {
return {
totalConnections: this.connections.size,
activeConnections: Array.from(this.connections.values())
.filter(conn => conn.ws.readyState === WebSocket.OPEN).length,
userCount: new Set(Array.from(this.connections.values())
.map(conn => conn.userId).filter(Boolean)).size
};
}
destroy(): void {
clearInterval(this.heartbeatInterval);
this.connections.forEach(conn => {
if (conn.ws.readyState === WebSocket.OPEN) {
conn.ws.close();
}
});
this.connections.clear();
}
}
Queue Management for Concurrent Requests
When building real-time AI applications at scale, managing concurrent requests becomes critical. Let's implement a robust queue system using Bull and Redis:
// src/queue-manager.ts
import Bull from 'bull';
import Redis from 'redis';
import { ChatOpenAI } from '@langchain/openai';
import { StringOutputParser } from '@langchain/core/output_parsers';
import { ChatPromptTemplate } from '@langchain/core/prompts';
interface ChatJob {
connectionId: string;
message: string;
conversationId: string;
userId?: string;
priority?: number;
}
export class QueueManager {
private chatQueue: Bull.Queue<ChatJob>;
private redis: Redis.RedisClient;
private connectionManager: ConnectionManager;
constructor(connectionManager: ConnectionManager) {
this.connectionManager = connectionManager;
// Initialize Redis
this.redis = Redis.createClient({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379')
});
// Initialize Bull queue
this.chatQueue = new Bull<ChatJob>('chat-processing', {
redis: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379')
},
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: false
}
});
// Process jobs
this.setupQueueProcessing();
// Monitor queue events
this.setupQueueMonitoring();
}
private setupQueueProcessing(): void {
// Process with concurrency based on available resources
const concurrency = parseInt(process.env.QUEUE_CONCURRENCY || '5');
this.chatQueue.process(concurrency, async (job) => {
const { connectionId, message, conversationId, userId } = job.data;
try {
// Update job progress
await job.progress(10);
const model = new ChatOpenAI({
modelName: 'gpt-3.5-turbo',
temperature: 0.7,
streaming: true,
});
const prompt = ChatPromptTemplate.fromTemplate(
"You are a helpful AI assistant. Respond to: {input}"
);
const chain = prompt.pipe(model).pipe(new StringOutputParser());
// Get WebSocket connection
const connections = userId
? this.connectionManager.getConnectionsByUserId(userId)
: [];
if (connections.length === 0) {
throw new Error('No active connections for user');
}
await job.progress(30);
// Send to all user connections
connections.forEach(conn => {
conn.ws.send(JSON.stringify({
type: 'stream_start',
conversationId,
jobId: job.id
}));
});
const stream = await chain.stream({ input: message });
let chunkCount = 0;
for await (const chunk of stream) {
connections.forEach(conn => {
if (conn.ws.readyState === WebSocket.OPEN) {
conn.ws.send(JSON.stringify({
type: 'stream_chunk',
conversationId,
content: chunk
}));
}
});
chunkCount++;
if (chunkCount % 10 === 0) {
await job.progress(30 + Math.min(60, chunkCount));
}
}
// Send completion
connections.forEach(conn => {
conn.ws.send(JSON.stringify({
type: 'stream_end',
conversationId,
jobId: job.id
}));
});
await job.progress(100);
return { success: true, conversationId };
} catch (error) {
console.error(`Job ${job.id} failed:`, error);
throw error;
}
});
}
private setupQueueMonitoring(): void {
this.chatQueue.on('completed', (job, result) => {
console.log(`Job ${job.id} completed`, result);
});
this.chatQueue.on('failed', (job, err) => {
console.error(`Job ${job.id} failed`, err);
// Notify user of failure
const { userId, conversationId } = job.data;
if (userId) {
this.connectionManager.broadcastToUser(userId, {
type: 'error',
conversationId,
error: 'Failed to process message',
jobId: job.id
});
}
});
this.chatQueue.on('stalled', (job) => {
console.warn(`Job ${job.id} stalled`);
});
}
async addChatJob(data: ChatJob): Promise<Bull.Job<ChatJob>> {
// Add priority queue support
const options: Bull.JobOptions = {};
if (data.priority) {
options.priority = data.priority;
}
// Rate limiting per user
if (data.userId) {
const userKey = `rate:${data.userId}`;
const count = await this.redis.incr(userKey);
if (count === 1) {
await this.redis.expire(userKey, 60); // 60 second window
}
if (count > 10) { // Max 10 requests per minute
throw new Error('Rate limit exceeded');
}
}
return this.chatQueue.add(data, options);
}
async getQueueMetrics() {
const [waiting, active, completed, failed, delayed] = await Promise.all([
this.chatQueue.getWaitingCount(),
this.chatQueue.getActiveCount(),
this.chatQueue.getCompletedCount(),
this.chatQueue.getFailedCount(),
this.chatQueue.getDelayedCount()
]);
return {
waiting,
active,
completed,
failed,
delayed,
total: waiting + active + delayed
};
}
async clearQueue(): Promise<void> {
await this.chatQueue.empty();
}
async close(): Promise<void> {
await this.chatQueue.close();
this.redis.quit();
}
}
Frontend Integration with React
Now let's build a React frontend that integrates with our LangChain WebSocket backend:
// src/hooks/useWebSocket.ts
import { useEffect, useRef, useState, useCallback } from 'react';
interface WebSocketMessage {
type: string;
[key: string]: any;
}
interface UseWebSocketOptions {
url: string;
reconnect?: boolean;
reconnectInterval?: number;
maxReconnectAttempts?: number;
onMessage?: (message: WebSocketMessage) => void;
onConnect?: () => void;
onDisconnect?: () => void;
onError?: (error: Event) => void;
}
export function useWebSocket({
url,
reconnect = true,
reconnectInterval = 5000,
maxReconnectAttempts = 10,
onMessage,
onConnect,
onDisconnect,
onError
}: UseWebSocketOptions) {
const ws = useRef<WebSocket | null>(null);
const reconnectCount = useRef(0);
const reconnectTimeout = useRef<NodeJS.Timeout>();
const [isConnected, setIsConnected] = useState(false);
const [connectionId, setConnectionId] = useState<string | null>(null);
const connect = useCallback(() => {
try {
ws.current = new WebSocket(url);
ws.current.onopen = () => {
console.log('WebSocket connected');
setIsConnected(true);
reconnectCount.current = 0;
onConnect?.();
};
ws.current.onmessage = (event) => {
try {
const message = JSON.parse(event.data) as WebSocketMessage;
if (message.type === 'connection') {
setConnectionId(message.connectionId);
}
onMessage?.(message);
} catch (error) {
console.error('Failed to parse WebSocket message:', error);
}
};
ws.current.onclose = () => {
console.log('WebSocket disconnected');
setIsConnected(false);
setConnectionId(null);
onDisconnect?.();
// Attempt reconnection
if (reconnect && reconnectCount.current < maxReconnectAttempts) {
reconnectTimeout.current = setTimeout(() => {
reconnectCount.current++;
console.log(`Reconnecting... Attempt ${reconnectCount.current}`);
connect();
}, reconnectInterval);
}
};
ws.current.onerror = (error) => {
console.error('WebSocket error:', error);
onError?.(error);
};
} catch (error) {
console.error('Failed to create WebSocket:', error);
}
}, [url, reconnect, reconnectInterval, maxReconnectAttempts, onMessage, onConnect, onDisconnect, onError]);
const disconnect = useCallback(() => {
if (reconnectTimeout.current) {
clearTimeout(reconnectTimeout.current);
}
if (ws.current) {
ws.current.close();
ws.current = null;
}
}, []);
const sendMessage = useCallback((message: WebSocketMessage) => {
if (ws.current && ws.current.readyState === WebSocket.OPEN) {
ws.current.send(JSON.stringify(message));
} else {
console.error('WebSocket is not connected');
}
}, []);
useEffect(() => {
connect();
// Cleanup on unmount
return () => {
disconnect();
};
}, [connect, disconnect]);
// Heartbeat to maintain connection
useEffect(() => {
const heartbeatInterval = setInterval(() => {
if (isConnected) {
sendMessage({ type: 'ping' });
}
}, 30000); // Ping every 30 seconds
return () => clearInterval(heartbeatInterval);
}, [isConnected, sendMessage]);
return {
isConnected,
connectionId,
sendMessage,
disconnect,
reconnect: connect
};
}
// src/components/RealtimeChat.tsx
import React, { useState, useEffect, useRef } from 'react';
import { useWebSocket } from '../hooks/useWebSocket';
import { v4 as uuidv4 } from 'uuid';
interface Message {
id: string;
content: string;
role: 'user' | 'assistant';
timestamp: Date;
streaming?: boolean;
}
export function RealtimeChat() {
const [messages, setMessages] = useState<Message[]>([]);
const [input, setInput] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
const messagesEndRef = useRef<HTMLDivElement>(null);
const currentStreamRef = useRef<string>('');
const conversationIdRef = useRef<string>(uuidv4());
const { isConnected, sendMessage } = useWebSocket({
url: 'ws://localhost:3001',
onMessage: (message) => {
handleWebSocketMessage(message);
},
onConnect: () => {
console.log('Connected to AI assistant');
},
onDisconnect: () => {
console.log('Disconnected from AI assistant');
}
});
const handleWebSocketMessage = (message: any) => {
switch (message.type) {
case 'stream_start':
setIsStreaming(true);
currentStreamRef.current = '';
// Add assistant message placeholder
setMessages(prev => [...prev, {
id: message.conversationId,
content: '',
role: 'assistant',
timestamp: new Date(),
streaming: true
}]);
break;
case 'stream_chunk':
currentStreamRef.current += message.content;
// Update streaming message
setMessages(prev => prev.map(msg =>
msg.id === message.conversationId
? { ...msg, content: currentStreamRef.current }
: msg
));
break;
case 'stream_end':
setIsStreaming(false);
// Mark message as complete
setMessages(prev => prev.map(msg =>
msg.id === message.conversationId
? { ...msg, streaming: false }
: msg
));
break;
case 'error':
setIsStreaming(false);
console.error('Chat error:', message.error);
// Add error message
setMessages(prev => [...prev, {
id: uuidv4(),
content: `Error: ${message.error}`,
role: 'assistant',
timestamp: new Date(),
streaming: false
}]);
break;
}
};
const sendChatMessage = () => {
if (!input.trim() || !isConnected || isStreaming) return;
const userMessage: Message = {
id: uuidv4(),
content: input,
role: 'user',
timestamp: new Date()
};
setMessages(prev => [...prev, userMessage]);
sendMessage({
type: 'chat',
message: input,
conversationId: conversationIdRef.current
});
setInput('');
conversationIdRef.current = uuidv4(); // New ID for next conversation
};
useEffect(() => {
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
}, [messages]);
return (
<div className="chat-container">
<div className="connection-status">
{isConnected ? (
<span className="connected">Connected</span>
) : (
<span className="disconnected">Disconnected</span>
)}
</div>
<div className="messages">
{messages.map((message) => (
<div
key={message.id}
className={`message ${message.role}`}
>
<div className="message-content">
{message.content}
{message.streaming && <span className="cursor-blink">▊</span>}
</div>
<div className="message-time">
{message.timestamp.toLocaleTimeString()}
</div>
</div>
))}
<div ref={messagesEndRef} />
</div>
<div className="input-area">
<input
type="text"
value={input}
onChange={(e) => setInput(e.target.value)}
onKeyPress={(e) => e.key === 'Enter' && sendChatMessage()}
placeholder="Type your message..."
disabled={!isConnected || isStreaming}
/>
<button
onClick={sendChatMessage}
disabled={!isConnected || isStreaming || !input.trim()}
>
{isStreaming ? 'Generating...' : 'Send'}
</button>
</div>
</div>
);
}
Performance Metrics and Monitoring
Monitoring is crucial for maintaining reliable real-time AI applications. Let's implement comprehensive metrics collection:
// src/metrics-collector.ts
import { EventEmitter } from 'events';
import winston from 'winston';
import { StatsD } from 'node-statsd';
interface MetricEvent {
name: string;
value: number;
tags?: Record<string, string>;
timestamp: Date;
}
export class MetricsCollector extends EventEmitter {
private statsd: StatsD;
private logger: winston.Logger;
private metrics: Map<string, MetricEvent[]> = new Map();
constructor() {
super();
// Initialize StatsD client
this.statsd = new StatsD({
host: process.env.STATSD_HOST || 'localhost',
port: parseInt(process.env.STATSD_PORT || '8125'),
prefix: 'realtime_ai.'
});
// Initialize logger
this.logger = winston.createLogger({
level: 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.json()
),
transports: [
new winston.transports.File({ filename: 'metrics.log' }),
new winston.transports.Console()
]
});
// Start periodic aggregation
setInterval(() => this.aggregateMetrics(), 60000); // Every minute
}
// WebSocket metrics
trackConnection(event: 'connect' | 'disconnect', metadata?: any): void {
this.statsd.increment(`websocket.${event}`);
this.logger.info(`WebSocket ${event}`, metadata);
}
trackMessageLatency(latency: number, messageType: string): void {
this.statsd.timing('websocket.message.latency', latency, [`type:${messageType}`]);
const metric: MetricEvent = {
name: 'message_latency',
value: latency,
tags: { type: messageType },
timestamp: new Date()
};
this.addMetric('latency', metric);
}
// LangChain streaming metrics
trackStreamingPerformance(data: {
conversationId: string;
duration: number;
tokenCount: number;
chunkCount: number;
model: string;
}): void {
const { duration, tokenCount, chunkCount, model } = data;
// Track various streaming metrics
this.statsd.timing('langchain.stream.duration', duration, [`model:${model}`]);
this.statsd.gauge('langchain.stream.tokens', tokenCount, [`model:${model}`]);
this.statsd.gauge('langchain.stream.chunks', chunkCount, [`model:${model}`]);
// Calculate tokens per second
const tokensPerSecond = tokenCount / (duration / 1000);
this.statsd.gauge('langchain.stream.tokens_per_second', tokensPerSecond, [`model:${model}`]);
this.logger.info('Streaming performance', data);
}
// Queue metrics
trackQueueMetrics(metrics: any): void {
Object.entries(metrics).forEach(([key, value]) => {
this.statsd.gauge(`queue.${key}`, value as number);
});
}
// Error tracking
trackError(error: Error, context: Record<string, any>): void {
this.statsd.increment('errors.total', [`type:${error.name}`]);
this.logger.error('Application error', {
error: {
name: error.name,
message: error.message,
stack: error.stack
},
context
});
}
// Performance metrics
trackResponseTime(duration: number, endpoint: string): void {
this.statsd.timing('api.response_time', duration, [`endpoint:${endpoint}`]);
}
// Memory and resource metrics
collectSystemMetrics(): void {
const memoryUsage = process.memoryUsage();
this.statsd.gauge('system.memory.rss', memoryUsage.rss);
this.statsd.gauge('system.memory.heap_total', memoryUsage.heapTotal);
this.statsd.gauge('system.memory.heap_used', memoryUsage.heapUsed);
this.statsd.gauge('system.memory.external', memoryUsage.external);
// CPU usage
const cpuUsage = process.cpuUsage();
this.statsd.gauge('system.cpu.user', cpuUsage.user);
this.statsd.gauge('system.cpu.system', cpuUsage.system);
}
// Custom metric tracking
private addMetric(category: string, metric: MetricEvent): void {
if (!this.metrics.has(category)) {
this.metrics.set(category, []);
}
const categoryMetrics = this.metrics.get(category)!;
categoryMetrics.push(metric);
// Keep only last hour of metrics
const oneHourAgo = new Date(Date.now() - 3600000);
this.metrics.set(
category,
categoryMetrics.filter(m => m.timestamp > oneHourAgo)
);
}
// Aggregate and report metrics
private aggregateMetrics(): void {
this.metrics.forEach((metrics, category) => {
if (metrics.length === 0) return;
const values = metrics.map(m => m.value);
const avg = values.reduce((a, b) => a + b, 0) / values.length;
const min = Math.min(...values);
const max = Math.max(...values);
this.statsd.gauge(`aggregate.${category}.avg`, avg);
this.statsd.gauge(`aggregate.${category}.min`, min);
this.statsd.gauge(`aggregate.${category}.max`, max);
this.logger.info(`Metric aggregation: ${category}`, {
count: metrics.length,
avg,
min,
max
});
});
// Collect system metrics
this.collectSystemMetrics();
}
// Get current metrics snapshot
getSnapshot(): Record<string, any> {
const snapshot: Record<string, any> = {};
this.metrics.forEach((metrics, category) => {
const values = metrics.map(m => m.value);
snapshot[category] = {
count: metrics.length,
average: values.length > 0
? values.reduce((a, b) => a + b, 0) / values.length
: 0,
min: values.length > 0 ? Math.min(...values) : 0,
max: values.length > 0 ? Math.max(...values) : 0
};
});
return snapshot;
}
close(): void {
this.statsd.close();
}
}
// Integration with main server
const metricsCollector = new MetricsCollector();
// Track WebSocket connections
wss.on('connection', (ws) => {
const startTime = Date.now();
metricsCollector.trackConnection('connect');
ws.on('message', (message) => {
const latency = Date.now() - startTime;
metricsCollector.trackMessageLatency(latency, 'chat');
});
ws.on('close', () => {
metricsCollector.trackConnection('disconnect');
});
});
// Expose metrics endpoint
app.get('/metrics', (req, res) => {
res.json({
snapshot: metricsCollector.getSnapshot(),
connections: connectionManager.getMetrics(),
queue: queueManager.getQueueMetrics()
});
});
Scaling Considerations
When building production-ready real-time AI applications, consider these scaling strategies:
Horizontal Scaling with Redis Pub/Sub
// src/scaled-websocket.ts
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';
// For multiple server instances
const pubClient = createClient({ host: 'localhost', port: 6379 });
const subClient = pubClient.duplicate();
// Implement cross-server message broadcasting
export class ScaledWebSocketManager {
private serverId: string = uuidv4();
async broadcastToAllServers(message: any): Promise<void> {
// Publish to Redis channel
await pubClient.publish('websocket:broadcast', JSON.stringify({
serverId: this.serverId,
message,
timestamp: new Date().toISOString()
}));
}
setupSubscriptions(): void {
subClient.subscribe('websocket:broadcast');
subClient.on('message', (channel, data) => {
const { serverId, message } = JSON.parse(data);
// Don't process our own messages
if (serverId === this.serverId) return;
// Broadcast to local connections
this.connectionManager.broadcastToAll(message);
});
}
}
Load Balancing Strategies
- Sticky Sessions: Ensure WebSocket connections stay with the same server
- Connection State in Redis: Store connection metadata centrally
- Queue-based distribution: Use message queues for work distribution
Resource Optimization
// Implement connection pooling for LangChain
class LangChainPool {
private pool: ChatOpenAI[] = [];
private maxSize: number = 10;
async acquire(): Promise<ChatOpenAI> {
if (this.pool.length > 0) {
return this.pool.pop()!;
}
return new ChatOpenAI({
modelName: 'gpt-3.5-turbo',
temperature: 0.7,
streaming: true,
});
}
release(model: ChatOpenAI): void {
if (this.pool.length < this.maxSize) {
this.pool.push(model);
}
}
}
Deployment Best Practices
When deploying your LangChain WebSocket application:
- Use a reverse proxy (Nginx) for WebSocket support
- Enable SSL/TLS for secure WebSocket connections (wss://)
- Configure proper CORS headers for cross-origin requests
- Implement rate limiting to prevent abuse
- Set up monitoring with tools like Prometheus and Grafana
- Use container orchestration (Kubernetes) for scaling
For detailed deployment guidance, check out our guides on deploying Node.js applications and Kubernetes best practices.
Conclusion
Building real-time AI applications with LangChain streaming and WebSockets opens up exciting possibilities for creating responsive, engaging user experiences. By implementing proper connection management, queue systems, and monitoring, you can build scalable applications that handle thousands of concurrent users.
Key takeaways:
- Use WebSockets for bidirectional real-time communication
- Implement robust reconnection logic and heartbeat monitoring
- Queue concurrent requests to manage load effectively
- Monitor performance metrics for optimization
- Plan for horizontal scaling from the start
The combination of LangChain's streaming capabilities with WebSocket technology enables you to build the next generation of AI-powered applications that feel instantaneous and responsive to users.