title: "Real-Time AI Applications with LangChain and WebSockets" description: "Learn how to build scalable real-time AI applications using LangChain streaming, WebSocket integration, and Server-Sent Events for responsive user experiences" date: "2024-01-15" author: "Fenil Sonani" tags: ["langchain", "websockets", "real-time", "ai", "streaming"] keywords: ["langchain streaming", "real-time ai", "langchain websocket", "server-sent events", "langchain real-time", "websocket ai integration", "streaming llm responses"]

Building real-time AI applications has become crucial for creating responsive and engaging user experiences. This comprehensive guide explores how to implement LangChain streaming with WebSockets and Server-Sent Events (SSE) to build a scalable real-time AI assistant. We'll cover everything from basic streaming responses to advanced topics like connection management, queue handling, and production-ready scaling considerations.

Why Real-Time AI Matters

Traditional request-response patterns for AI applications can lead to poor user experiences, especially when dealing with large language models that take seconds to generate complete responses. Real-time streaming solves this by:

  • Immediate feedback: Users see responses as they're generated
  • Better perceived performance: Even slower models feel responsive
  • Enhanced interactivity: Enable features like real-time collaboration
  • Resource efficiency: Stream processing reduces memory overhead

Understanding Streaming Technologies

Before diving into implementation, let's understand the three main approaches for real-time communication in web applications:

WebSockets

WebSockets provide full-duplex communication channels over a single TCP connection, perfect for bidirectional real-time data flow.

Advantages:

  • Bidirectional communication
  • Low latency
  • Efficient for frequent small messages
  • Persistent connections

Use cases:

  • Collaborative features
  • Real-time notifications
  • Interactive AI assistants

Server-Sent Events (SSE)

SSE enables servers to push data to web clients over HTTP, ideal for unidirectional streaming from server to client.

Advantages:

  • Simple implementation
  • Automatic reconnection
  • Works over standard HTTP
  • Built-in event IDs for message ordering

Use cases:

  • Streaming AI responses
  • Live updates
  • Progress notifications

Long Polling

While not true real-time, long polling can be a fallback option for environments where WebSockets and SSE aren't available.

Setting Up the Backend Infrastructure

Let's build a complete real-time AI assistant using LangChain, WebSockets, and proper connection management. We'll use Node.js with Express for the backend and React for the frontend.

Installing Dependencies

# Backend dependencies
npm install express ws langchain @langchain/openai dotenv cors
npm install --save-dev @types/ws @types/express typescript nodemon

# Additional utilities
npm install uuid winston bull redis

Basic WebSocket Server with LangChain

Here's our foundational WebSocket server with LangChain integration:

// src/server.ts
import express from 'express';
import { createServer } from 'http';
import WebSocket from 'ws';
import { ChatOpenAI } from '@langchain/openai';
import { StringOutputParser } from '@langchain/core/output_parsers';
import { ChatPromptTemplate } from '@langchain/core/prompts';
import dotenv from 'dotenv';
import { v4 as uuidv4 } from 'uuid';

dotenv.config();

const app = express();
const server = createServer(app);
const wss = new WebSocket.Server({ server });

// Initialize LangChain with streaming
const model = new ChatOpenAI({
  modelName: 'gpt-3.5-turbo',
  temperature: 0.7,
  streaming: true,
  openAIApiKey: process.env.OPENAI_API_KEY,
});

const outputParser = new StringOutputParser();

// Connection tracking
const connections = new Map<string, WebSocket>();

wss.on('connection', (ws: WebSocket) => {
  const connectionId = uuidv4();
  connections.set(connectionId, ws);
  
  console.log(`New connection: ${connectionId}`);
  
  // Send connection established message
  ws.send(JSON.stringify({
    type: 'connection',
    connectionId,
    status: 'connected'
  }));
  
  ws.on('message', async (message: string) => {
    try {
      const data = JSON.parse(message.toString());
      
      if (data.type === 'chat') {
        await handleChatMessage(ws, data);
      } else if (data.type === 'ping') {
        ws.send(JSON.stringify({ type: 'pong' }));
      }
    } catch (error) {
      console.error('Error processing message:', error);
      ws.send(JSON.stringify({
        type: 'error',
        error: 'Failed to process message'
      }));
    }
  });
  
  ws.on('close', () => {
    connections.delete(connectionId);
    console.log(`Connection closed: ${connectionId}`);
  });
  
  ws.on('error', (error) => {
    console.error(`WebSocket error for ${connectionId}:`, error);
  });
});

async function handleChatMessage(ws: WebSocket, data: any) {
  const { message, conversationId } = data;
  
  try {
    // Create prompt template
    const prompt = ChatPromptTemplate.fromTemplate(
      "You are a helpful AI assistant. Respond to the following: {input}"
    );
    
    // Create streaming chain
    const chain = prompt.pipe(model).pipe(outputParser);
    
    // Send start of stream
    ws.send(JSON.stringify({
      type: 'stream_start',
      conversationId,
      timestamp: new Date().toISOString()
    }));
    
    // Stream the response
    const stream = await chain.stream({
      input: message
    });
    
    for await (const chunk of stream) {
      ws.send(JSON.stringify({
        type: 'stream_chunk',
        conversationId,
        content: chunk
      }));
    }
    
    // Send end of stream
    ws.send(JSON.stringify({
      type: 'stream_end',
      conversationId,
      timestamp: new Date().toISOString()
    }));
    
  } catch (error) {
    console.error('Error in chat handler:', error);
    ws.send(JSON.stringify({
      type: 'error',
      conversationId,
      error: 'Failed to generate response'
    }));
  }
}

server.listen(3001, () => {
  console.log('WebSocket server running on port 3001');
});

Implementing Server-Sent Events Alternative

For scenarios where WebSockets aren't suitable, here's an SSE implementation:

// src/sse-handler.ts
import { Response } from 'express';
import { ChatOpenAI } from '@langchain/openai';
import { StringOutputParser } from '@langchain/core/output_parsers';
import { ChatPromptTemplate } from '@langchain/core/prompts';

export async function handleSSEChat(req: any, res: Response) {
  // Set headers for SSE
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');
  res.setHeader('Access-Control-Allow-Origin', '*');
  
  // Send initial connection event
  res.write(`event: connected\ndata: ${JSON.stringify({ status: 'connected' })}\n\n`);
  
  const { message } = req.body;
  
  try {
    const model = new ChatOpenAI({
      modelName: 'gpt-3.5-turbo',
      temperature: 0.7,
      streaming: true,
    });
    
    const prompt = ChatPromptTemplate.fromTemplate(
      "You are a helpful AI assistant. Respond to: {input}"
    );
    
    const chain = prompt.pipe(model).pipe(new StringOutputParser());
    
    // Send stream start event
    res.write(`event: stream_start\ndata: ${JSON.stringify({ timestamp: new Date().toISOString() })}\n\n`);
    
    const stream = await chain.stream({ input: message });
    
    for await (const chunk of stream) {
      // Send each chunk as an SSE event
      res.write(`event: stream_chunk\ndata: ${JSON.stringify({ content: chunk })}\n\n`);
    }
    
    // Send stream end event
    res.write(`event: stream_end\ndata: ${JSON.stringify({ timestamp: new Date().toISOString() })}\n\n`);
    
    res.end();
  } catch (error) {
    res.write(`event: error\ndata: ${JSON.stringify({ error: 'Failed to generate response' })}\n\n`);
    res.end();
  }
}

// Add route to Express app
app.post('/api/chat/sse', express.json(), handleSSEChat);

Advanced Connection Management

Robust connection management is crucial for production applications. Let's implement reconnection logic, heartbeat monitoring, and connection pooling:

// src/connection-manager.ts
import WebSocket from 'ws';
import { EventEmitter } from 'events';
import winston from 'winston';

interface ConnectionInfo {
  id: string;
  ws: WebSocket;
  userId?: string;
  lastActivity: Date;
  reconnectAttempts: number;
}

export class ConnectionManager extends EventEmitter {
  private connections: Map<string, ConnectionInfo> = new Map();
  private heartbeatInterval: NodeJS.Timer;
  private logger: winston.Logger;
  
  constructor() {
    super();
    
    this.logger = winston.createLogger({
      level: 'info',
      format: winston.format.json(),
      transports: [
        new winston.transports.File({ filename: 'connections.log' }),
        new winston.transports.Console()
      ]
    });
    
    // Start heartbeat monitoring
    this.heartbeatInterval = setInterval(() => {
      this.checkHeartbeats();
    }, 30000); // Check every 30 seconds
  }
  
  addConnection(id: string, ws: WebSocket, userId?: string): void {
    const connectionInfo: ConnectionInfo = {
      id,
      ws,
      userId,
      lastActivity: new Date(),
      reconnectAttempts: 0
    };
    
    this.connections.set(id, connectionInfo);
    
    // Setup ping/pong for connection health
    ws.on('pong', () => {
      const conn = this.connections.get(id);
      if (conn) {
        conn.lastActivity = new Date();
      }
    });
    
    this.logger.info('Connection added', { id, userId });
    this.emit('connection:added', connectionInfo);
  }
  
  removeConnection(id: string): void {
    const conn = this.connections.get(id);
    if (conn) {
      this.connections.delete(id);
      this.logger.info('Connection removed', { id, userId: conn.userId });
      this.emit('connection:removed', conn);
    }
  }
  
  private checkHeartbeats(): void {
    const now = new Date();
    const timeout = 60000; // 60 seconds timeout
    
    this.connections.forEach((conn, id) => {
      const timeSinceLastActivity = now.getTime() - conn.lastActivity.getTime();
      
      if (timeSinceLastActivity > timeout) {
        if (conn.ws.readyState === WebSocket.OPEN) {
          // Try to ping the connection
          conn.ws.ping();
        } else {
          // Connection is dead, remove it
          this.removeConnection(id);
        }
      }
    });
  }
  
  getConnectionsByUserId(userId: string): ConnectionInfo[] {
    return Array.from(this.connections.values())
      .filter(conn => conn.userId === userId);
  }
  
  broadcastToUser(userId: string, message: any): void {
    const userConnections = this.getConnectionsByUserId(userId);
    const messageStr = JSON.stringify(message);
    
    userConnections.forEach(conn => {
      if (conn.ws.readyState === WebSocket.OPEN) {
        conn.ws.send(messageStr);
      }
    });
  }
  
  getMetrics() {
    return {
      totalConnections: this.connections.size,
      activeConnections: Array.from(this.connections.values())
        .filter(conn => conn.ws.readyState === WebSocket.OPEN).length,
      userCount: new Set(Array.from(this.connections.values())
        .map(conn => conn.userId).filter(Boolean)).size
    };
  }
  
  destroy(): void {
    clearInterval(this.heartbeatInterval);
    this.connections.forEach(conn => {
      if (conn.ws.readyState === WebSocket.OPEN) {
        conn.ws.close();
      }
    });
    this.connections.clear();
  }
}

Queue Management for Concurrent Requests

When building real-time AI applications at scale, managing concurrent requests becomes critical. Let's implement a robust queue system using Bull and Redis:

// src/queue-manager.ts
import Bull from 'bull';
import Redis from 'redis';
import { ChatOpenAI } from '@langchain/openai';
import { StringOutputParser } from '@langchain/core/output_parsers';
import { ChatPromptTemplate } from '@langchain/core/prompts';

interface ChatJob {
  connectionId: string;
  message: string;
  conversationId: string;
  userId?: string;
  priority?: number;
}

export class QueueManager {
  private chatQueue: Bull.Queue<ChatJob>;
  private redis: Redis.RedisClient;
  private connectionManager: ConnectionManager;
  
  constructor(connectionManager: ConnectionManager) {
    this.connectionManager = connectionManager;
    
    // Initialize Redis
    this.redis = Redis.createClient({
      host: process.env.REDIS_HOST || 'localhost',
      port: parseInt(process.env.REDIS_PORT || '6379')
    });
    
    // Initialize Bull queue
    this.chatQueue = new Bull<ChatJob>('chat-processing', {
      redis: {
        host: process.env.REDIS_HOST || 'localhost',
        port: parseInt(process.env.REDIS_PORT || '6379')
      },
      defaultJobOptions: {
        removeOnComplete: true,
        removeOnFail: false
      }
    });
    
    // Process jobs
    this.setupQueueProcessing();
    
    // Monitor queue events
    this.setupQueueMonitoring();
  }
  
  private setupQueueProcessing(): void {
    // Process with concurrency based on available resources
    const concurrency = parseInt(process.env.QUEUE_CONCURRENCY || '5');
    
    this.chatQueue.process(concurrency, async (job) => {
      const { connectionId, message, conversationId, userId } = job.data;
      
      try {
        // Update job progress
        await job.progress(10);
        
        const model = new ChatOpenAI({
          modelName: 'gpt-3.5-turbo',
          temperature: 0.7,
          streaming: true,
        });
        
        const prompt = ChatPromptTemplate.fromTemplate(
          "You are a helpful AI assistant. Respond to: {input}"
        );
        
        const chain = prompt.pipe(model).pipe(new StringOutputParser());
        
        // Get WebSocket connection
        const connections = userId 
          ? this.connectionManager.getConnectionsByUserId(userId)
          : [];
        
        if (connections.length === 0) {
          throw new Error('No active connections for user');
        }
        
        await job.progress(30);
        
        // Send to all user connections
        connections.forEach(conn => {
          conn.ws.send(JSON.stringify({
            type: 'stream_start',
            conversationId,
            jobId: job.id
          }));
        });
        
        const stream = await chain.stream({ input: message });
        let chunkCount = 0;
        
        for await (const chunk of stream) {
          connections.forEach(conn => {
            if (conn.ws.readyState === WebSocket.OPEN) {
              conn.ws.send(JSON.stringify({
                type: 'stream_chunk',
                conversationId,
                content: chunk
              }));
            }
          });
          
          chunkCount++;
          if (chunkCount % 10 === 0) {
            await job.progress(30 + Math.min(60, chunkCount));
          }
        }
        
        // Send completion
        connections.forEach(conn => {
          conn.ws.send(JSON.stringify({
            type: 'stream_end',
            conversationId,
            jobId: job.id
          }));
        });
        
        await job.progress(100);
        
        return { success: true, conversationId };
        
      } catch (error) {
        console.error(`Job ${job.id} failed:`, error);
        throw error;
      }
    });
  }
  
  private setupQueueMonitoring(): void {
    this.chatQueue.on('completed', (job, result) => {
      console.log(`Job ${job.id} completed`, result);
    });
    
    this.chatQueue.on('failed', (job, err) => {
      console.error(`Job ${job.id} failed`, err);
      
      // Notify user of failure
      const { userId, conversationId } = job.data;
      if (userId) {
        this.connectionManager.broadcastToUser(userId, {
          type: 'error',
          conversationId,
          error: 'Failed to process message',
          jobId: job.id
        });
      }
    });
    
    this.chatQueue.on('stalled', (job) => {
      console.warn(`Job ${job.id} stalled`);
    });
  }
  
  async addChatJob(data: ChatJob): Promise<Bull.Job<ChatJob>> {
    // Add priority queue support
    const options: Bull.JobOptions = {};
    
    if (data.priority) {
      options.priority = data.priority;
    }
    
    // Rate limiting per user
    if (data.userId) {
      const userKey = `rate:${data.userId}`;
      const count = await this.redis.incr(userKey);
      
      if (count === 1) {
        await this.redis.expire(userKey, 60); // 60 second window
      }
      
      if (count > 10) { // Max 10 requests per minute
        throw new Error('Rate limit exceeded');
      }
    }
    
    return this.chatQueue.add(data, options);
  }
  
  async getQueueMetrics() {
    const [waiting, active, completed, failed, delayed] = await Promise.all([
      this.chatQueue.getWaitingCount(),
      this.chatQueue.getActiveCount(),
      this.chatQueue.getCompletedCount(),
      this.chatQueue.getFailedCount(),
      this.chatQueue.getDelayedCount()
    ]);
    
    return {
      waiting,
      active,
      completed,
      failed,
      delayed,
      total: waiting + active + delayed
    };
  }
  
  async clearQueue(): Promise<void> {
    await this.chatQueue.empty();
  }
  
  async close(): Promise<void> {
    await this.chatQueue.close();
    this.redis.quit();
  }
}

Frontend Integration with React

Now let's build a React frontend that integrates with our LangChain WebSocket backend:

// src/hooks/useWebSocket.ts
import { useEffect, useRef, useState, useCallback } from 'react';

interface WebSocketMessage {
  type: string;
  [key: string]: any;
}

interface UseWebSocketOptions {
  url: string;
  reconnect?: boolean;
  reconnectInterval?: number;
  maxReconnectAttempts?: number;
  onMessage?: (message: WebSocketMessage) => void;
  onConnect?: () => void;
  onDisconnect?: () => void;
  onError?: (error: Event) => void;
}

export function useWebSocket({
  url,
  reconnect = true,
  reconnectInterval = 5000,
  maxReconnectAttempts = 10,
  onMessage,
  onConnect,
  onDisconnect,
  onError
}: UseWebSocketOptions) {
  const ws = useRef<WebSocket | null>(null);
  const reconnectCount = useRef(0);
  const reconnectTimeout = useRef<NodeJS.Timeout>();
  const [isConnected, setIsConnected] = useState(false);
  const [connectionId, setConnectionId] = useState<string | null>(null);
  
  const connect = useCallback(() => {
    try {
      ws.current = new WebSocket(url);
      
      ws.current.onopen = () => {
        console.log('WebSocket connected');
        setIsConnected(true);
        reconnectCount.current = 0;
        onConnect?.();
      };
      
      ws.current.onmessage = (event) => {
        try {
          const message = JSON.parse(event.data) as WebSocketMessage;
          
          if (message.type === 'connection') {
            setConnectionId(message.connectionId);
          }
          
          onMessage?.(message);
        } catch (error) {
          console.error('Failed to parse WebSocket message:', error);
        }
      };
      
      ws.current.onclose = () => {
        console.log('WebSocket disconnected');
        setIsConnected(false);
        setConnectionId(null);
        onDisconnect?.();
        
        // Attempt reconnection
        if (reconnect && reconnectCount.current < maxReconnectAttempts) {
          reconnectTimeout.current = setTimeout(() => {
            reconnectCount.current++;
            console.log(`Reconnecting... Attempt ${reconnectCount.current}`);
            connect();
          }, reconnectInterval);
        }
      };
      
      ws.current.onerror = (error) => {
        console.error('WebSocket error:', error);
        onError?.(error);
      };
      
    } catch (error) {
      console.error('Failed to create WebSocket:', error);
    }
  }, [url, reconnect, reconnectInterval, maxReconnectAttempts, onMessage, onConnect, onDisconnect, onError]);
  
  const disconnect = useCallback(() => {
    if (reconnectTimeout.current) {
      clearTimeout(reconnectTimeout.current);
    }
    
    if (ws.current) {
      ws.current.close();
      ws.current = null;
    }
  }, []);
  
  const sendMessage = useCallback((message: WebSocketMessage) => {
    if (ws.current && ws.current.readyState === WebSocket.OPEN) {
      ws.current.send(JSON.stringify(message));
    } else {
      console.error('WebSocket is not connected');
    }
  }, []);
  
  useEffect(() => {
    connect();
    
    // Cleanup on unmount
    return () => {
      disconnect();
    };
  }, [connect, disconnect]);
  
  // Heartbeat to maintain connection
  useEffect(() => {
    const heartbeatInterval = setInterval(() => {
      if (isConnected) {
        sendMessage({ type: 'ping' });
      }
    }, 30000); // Ping every 30 seconds
    
    return () => clearInterval(heartbeatInterval);
  }, [isConnected, sendMessage]);
  
  return {
    isConnected,
    connectionId,
    sendMessage,
    disconnect,
    reconnect: connect
  };
}
// src/components/RealtimeChat.tsx
import React, { useState, useEffect, useRef } from 'react';
import { useWebSocket } from '../hooks/useWebSocket';
import { v4 as uuidv4 } from 'uuid';

interface Message {
  id: string;
  content: string;
  role: 'user' | 'assistant';
  timestamp: Date;
  streaming?: boolean;
}

export function RealtimeChat() {
  const [messages, setMessages] = useState<Message[]>([]);
  const [input, setInput] = useState('');
  const [isStreaming, setIsStreaming] = useState(false);
  const messagesEndRef = useRef<HTMLDivElement>(null);
  const currentStreamRef = useRef<string>('');
  const conversationIdRef = useRef<string>(uuidv4());
  
  const { isConnected, sendMessage } = useWebSocket({
    url: 'ws://localhost:3001',
    onMessage: (message) => {
      handleWebSocketMessage(message);
    },
    onConnect: () => {
      console.log('Connected to AI assistant');
    },
    onDisconnect: () => {
      console.log('Disconnected from AI assistant');
    }
  });
  
  const handleWebSocketMessage = (message: any) => {
    switch (message.type) {
      case 'stream_start':
        setIsStreaming(true);
        currentStreamRef.current = '';
        
        // Add assistant message placeholder
        setMessages(prev => [...prev, {
          id: message.conversationId,
          content: '',
          role: 'assistant',
          timestamp: new Date(),
          streaming: true
        }]);
        break;
        
      case 'stream_chunk':
        currentStreamRef.current += message.content;
        
        // Update streaming message
        setMessages(prev => prev.map(msg => 
          msg.id === message.conversationId 
            ? { ...msg, content: currentStreamRef.current }
            : msg
        ));
        break;
        
      case 'stream_end':
        setIsStreaming(false);
        
        // Mark message as complete
        setMessages(prev => prev.map(msg => 
          msg.id === message.conversationId 
            ? { ...msg, streaming: false }
            : msg
        ));
        break;
        
      case 'error':
        setIsStreaming(false);
        console.error('Chat error:', message.error);
        
        // Add error message
        setMessages(prev => [...prev, {
          id: uuidv4(),
          content: `Error: ${message.error}`,
          role: 'assistant',
          timestamp: new Date(),
          streaming: false
        }]);
        break;
    }
  };
  
  const sendChatMessage = () => {
    if (!input.trim() || !isConnected || isStreaming) return;
    
    const userMessage: Message = {
      id: uuidv4(),
      content: input,
      role: 'user',
      timestamp: new Date()
    };
    
    setMessages(prev => [...prev, userMessage]);
    
    sendMessage({
      type: 'chat',
      message: input,
      conversationId: conversationIdRef.current
    });
    
    setInput('');
    conversationIdRef.current = uuidv4(); // New ID for next conversation
  };
  
  useEffect(() => {
    messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
  }, [messages]);
  
  return (
    <div className="chat-container">
      <div className="connection-status">
        {isConnected ? (
          <span className="connected">Connected</span>
        ) : (
          <span className="disconnected">Disconnected</span>
        )}
      </div>
      
      <div className="messages">
        {messages.map((message) => (
          <div 
            key={message.id} 
            className={`message ${message.role}`}
          >
            <div className="message-content">
              {message.content}
              {message.streaming && <span className="cursor-blink"></span>}
            </div>
            <div className="message-time">
              {message.timestamp.toLocaleTimeString()}
            </div>
          </div>
        ))}
        <div ref={messagesEndRef} />
      </div>
      
      <div className="input-area">
        <input
          type="text"
          value={input}
          onChange={(e) => setInput(e.target.value)}
          onKeyPress={(e) => e.key === 'Enter' && sendChatMessage()}
          placeholder="Type your message..."
          disabled={!isConnected || isStreaming}
        />
        <button 
          onClick={sendChatMessage}
          disabled={!isConnected || isStreaming || !input.trim()}
        >
          {isStreaming ? 'Generating...' : 'Send'}
        </button>
      </div>
    </div>
  );
}

Performance Metrics and Monitoring

Monitoring is crucial for maintaining reliable real-time AI applications. Let's implement comprehensive metrics collection:

// src/metrics-collector.ts
import { EventEmitter } from 'events';
import winston from 'winston';
import { StatsD } from 'node-statsd';

interface MetricEvent {
  name: string;
  value: number;
  tags?: Record<string, string>;
  timestamp: Date;
}

export class MetricsCollector extends EventEmitter {
  private statsd: StatsD;
  private logger: winston.Logger;
  private metrics: Map<string, MetricEvent[]> = new Map();
  
  constructor() {
    super();
    
    // Initialize StatsD client
    this.statsd = new StatsD({
      host: process.env.STATSD_HOST || 'localhost',
      port: parseInt(process.env.STATSD_PORT || '8125'),
      prefix: 'realtime_ai.'
    });
    
    // Initialize logger
    this.logger = winston.createLogger({
      level: 'info',
      format: winston.format.combine(
        winston.format.timestamp(),
        winston.format.json()
      ),
      transports: [
        new winston.transports.File({ filename: 'metrics.log' }),
        new winston.transports.Console()
      ]
    });
    
    // Start periodic aggregation
    setInterval(() => this.aggregateMetrics(), 60000); // Every minute
  }
  
  // WebSocket metrics
  trackConnection(event: 'connect' | 'disconnect', metadata?: any): void {
    this.statsd.increment(`websocket.${event}`);
    this.logger.info(`WebSocket ${event}`, metadata);
  }
  
  trackMessageLatency(latency: number, messageType: string): void {
    this.statsd.timing('websocket.message.latency', latency, [`type:${messageType}`]);
    
    const metric: MetricEvent = {
      name: 'message_latency',
      value: latency,
      tags: { type: messageType },
      timestamp: new Date()
    };
    
    this.addMetric('latency', metric);
  }
  
  // LangChain streaming metrics
  trackStreamingPerformance(data: {
    conversationId: string;
    duration: number;
    tokenCount: number;
    chunkCount: number;
    model: string;
  }): void {
    const { duration, tokenCount, chunkCount, model } = data;
    
    // Track various streaming metrics
    this.statsd.timing('langchain.stream.duration', duration, [`model:${model}`]);
    this.statsd.gauge('langchain.stream.tokens', tokenCount, [`model:${model}`]);
    this.statsd.gauge('langchain.stream.chunks', chunkCount, [`model:${model}`]);
    
    // Calculate tokens per second
    const tokensPerSecond = tokenCount / (duration / 1000);
    this.statsd.gauge('langchain.stream.tokens_per_second', tokensPerSecond, [`model:${model}`]);
    
    this.logger.info('Streaming performance', data);
  }
  
  // Queue metrics
  trackQueueMetrics(metrics: any): void {
    Object.entries(metrics).forEach(([key, value]) => {
      this.statsd.gauge(`queue.${key}`, value as number);
    });
  }
  
  // Error tracking
  trackError(error: Error, context: Record<string, any>): void {
    this.statsd.increment('errors.total', [`type:${error.name}`]);
    
    this.logger.error('Application error', {
      error: {
        name: error.name,
        message: error.message,
        stack: error.stack
      },
      context
    });
  }
  
  // Performance metrics
  trackResponseTime(duration: number, endpoint: string): void {
    this.statsd.timing('api.response_time', duration, [`endpoint:${endpoint}`]);
  }
  
  // Memory and resource metrics
  collectSystemMetrics(): void {
    const memoryUsage = process.memoryUsage();
    
    this.statsd.gauge('system.memory.rss', memoryUsage.rss);
    this.statsd.gauge('system.memory.heap_total', memoryUsage.heapTotal);
    this.statsd.gauge('system.memory.heap_used', memoryUsage.heapUsed);
    this.statsd.gauge('system.memory.external', memoryUsage.external);
    
    // CPU usage
    const cpuUsage = process.cpuUsage();
    this.statsd.gauge('system.cpu.user', cpuUsage.user);
    this.statsd.gauge('system.cpu.system', cpuUsage.system);
  }
  
  // Custom metric tracking
  private addMetric(category: string, metric: MetricEvent): void {
    if (!this.metrics.has(category)) {
      this.metrics.set(category, []);
    }
    
    const categoryMetrics = this.metrics.get(category)!;
    categoryMetrics.push(metric);
    
    // Keep only last hour of metrics
    const oneHourAgo = new Date(Date.now() - 3600000);
    this.metrics.set(
      category,
      categoryMetrics.filter(m => m.timestamp > oneHourAgo)
    );
  }
  
  // Aggregate and report metrics
  private aggregateMetrics(): void {
    this.metrics.forEach((metrics, category) => {
      if (metrics.length === 0) return;
      
      const values = metrics.map(m => m.value);
      const avg = values.reduce((a, b) => a + b, 0) / values.length;
      const min = Math.min(...values);
      const max = Math.max(...values);
      
      this.statsd.gauge(`aggregate.${category}.avg`, avg);
      this.statsd.gauge(`aggregate.${category}.min`, min);
      this.statsd.gauge(`aggregate.${category}.max`, max);
      
      this.logger.info(`Metric aggregation: ${category}`, {
        count: metrics.length,
        avg,
        min,
        max
      });
    });
    
    // Collect system metrics
    this.collectSystemMetrics();
  }
  
  // Get current metrics snapshot
  getSnapshot(): Record<string, any> {
    const snapshot: Record<string, any> = {};
    
    this.metrics.forEach((metrics, category) => {
      const values = metrics.map(m => m.value);
      snapshot[category] = {
        count: metrics.length,
        average: values.length > 0 
          ? values.reduce((a, b) => a + b, 0) / values.length 
          : 0,
        min: values.length > 0 ? Math.min(...values) : 0,
        max: values.length > 0 ? Math.max(...values) : 0
      };
    });
    
    return snapshot;
  }
  
  close(): void {
    this.statsd.close();
  }
}

// Integration with main server
const metricsCollector = new MetricsCollector();

// Track WebSocket connections
wss.on('connection', (ws) => {
  const startTime = Date.now();
  metricsCollector.trackConnection('connect');
  
  ws.on('message', (message) => {
    const latency = Date.now() - startTime;
    metricsCollector.trackMessageLatency(latency, 'chat');
  });
  
  ws.on('close', () => {
    metricsCollector.trackConnection('disconnect');
  });
});

// Expose metrics endpoint
app.get('/metrics', (req, res) => {
  res.json({
    snapshot: metricsCollector.getSnapshot(),
    connections: connectionManager.getMetrics(),
    queue: queueManager.getQueueMetrics()
  });
});

Scaling Considerations

When building production-ready real-time AI applications, consider these scaling strategies:

Horizontal Scaling with Redis Pub/Sub

// src/scaled-websocket.ts
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';

// For multiple server instances
const pubClient = createClient({ host: 'localhost', port: 6379 });
const subClient = pubClient.duplicate();

// Implement cross-server message broadcasting
export class ScaledWebSocketManager {
  private serverId: string = uuidv4();
  
  async broadcastToAllServers(message: any): Promise<void> {
    // Publish to Redis channel
    await pubClient.publish('websocket:broadcast', JSON.stringify({
      serverId: this.serverId,
      message,
      timestamp: new Date().toISOString()
    }));
  }
  
  setupSubscriptions(): void {
    subClient.subscribe('websocket:broadcast');
    
    subClient.on('message', (channel, data) => {
      const { serverId, message } = JSON.parse(data);
      
      // Don't process our own messages
      if (serverId === this.serverId) return;
      
      // Broadcast to local connections
      this.connectionManager.broadcastToAll(message);
    });
  }
}

Load Balancing Strategies

  1. Sticky Sessions: Ensure WebSocket connections stay with the same server
  2. Connection State in Redis: Store connection metadata centrally
  3. Queue-based distribution: Use message queues for work distribution

Resource Optimization

// Implement connection pooling for LangChain
class LangChainPool {
  private pool: ChatOpenAI[] = [];
  private maxSize: number = 10;
  
  async acquire(): Promise<ChatOpenAI> {
    if (this.pool.length > 0) {
      return this.pool.pop()!;
    }
    
    return new ChatOpenAI({
      modelName: 'gpt-3.5-turbo',
      temperature: 0.7,
      streaming: true,
    });
  }
  
  release(model: ChatOpenAI): void {
    if (this.pool.length < this.maxSize) {
      this.pool.push(model);
    }
  }
}

Deployment Best Practices

When deploying your LangChain WebSocket application:

  1. Use a reverse proxy (Nginx) for WebSocket support
  2. Enable SSL/TLS for secure WebSocket connections (wss://)
  3. Configure proper CORS headers for cross-origin requests
  4. Implement rate limiting to prevent abuse
  5. Set up monitoring with tools like Prometheus and Grafana
  6. Use container orchestration (Kubernetes) for scaling

For detailed deployment guidance, check out our guides on deploying Node.js applications and Kubernetes best practices.

Conclusion

Building real-time AI applications with LangChain streaming and WebSockets opens up exciting possibilities for creating responsive, engaging user experiences. By implementing proper connection management, queue systems, and monitoring, you can build scalable applications that handle thousands of concurrent users.

Key takeaways:

  • Use WebSockets for bidirectional real-time communication
  • Implement robust reconnection logic and heartbeat monitoring
  • Queue concurrent requests to manage load effectively
  • Monitor performance metrics for optimization
  • Plan for horizontal scaling from the start

The combination of LangChain's streaming capabilities with WebSocket technology enables you to build the next generation of AI-powered applications that feel instantaneous and responsive to users.

Additional Resources