title: "LangChain Agents: Building Autonomous AI Systems" date: "2024-01-11" description: "Master the art of building autonomous AI systems with LangChain agents. Learn about ReAct pattern, tool creation, agent executors, and multi-agent architectures for intelligent decision-making." tags: ["langchain", "agents", "ai", "autonomous-systems", "react-pattern"] published: true
LangChain Agents: Building Autonomous AI Systems
In the rapidly evolving landscape of AI development, LangChain agents represent a paradigm shift in how we build intelligent systems. These autonomous AI agents can reason, plan, and execute complex tasks by leveraging tools and making decisions based on intermediate results. This comprehensive guide will walk you through building sophisticated agent systems that can adapt to various scenarios and solve real-world problems.
Understanding LangChain Agents
LangChain agents are autonomous systems that combine the reasoning capabilities of large language models (LLMs) with the ability to take actions through tools. Unlike traditional chatbots that simply respond to queries, agents can:
- Reason about problems and determine the best approach
- Plan multi-step solutions to complex tasks
- Execute actions using various tools and APIs
- Adapt their strategy based on intermediate results
- Learn from context and maintain memory across interactions
The power of autonomous AI lies in this ability to make decisions and take actions without constant human intervention.
The ReAct Pattern: Foundation of Agent Intelligence
The ReAct (Reasoning and Acting) pattern is the cornerstone of modern agent architectures. It enables agents to interleave reasoning traces with action execution, creating a powerful problem-solving loop.
How ReAct Works
from langchain.agents import AgentType, initialize_agent
from langchain.agents.react.base import DocstoreExplorer
from langchain.tools import Tool
from langchain.llms import OpenAI
# ReAct pattern in action
class ReActAgent:
def __init__(self, llm):
self.llm = llm
self.thought_history = []
self.action_history = []
def think(self, observation):
"""Generate a thought based on current observation"""
thought = self.llm.generate(
f"Based on: {observation}\nWhat should I think about next?"
)
self.thought_history.append(thought)
return thought
def act(self, thought):
"""Decide and execute an action based on thought"""
action = self.llm.generate(
f"Based on thought: {thought}\nWhat action should I take?"
)
self.action_history.append(action)
return self.execute_action(action)
def execute_action(self, action):
"""Execute the decided action"""
# Action execution logic here
pass
The ReAct pattern creates a synergy between reasoning and acting, allowing agents to:
- Observe the current state
- Think about what needs to be done
- Act by selecting and using appropriate tools
- Observe the results
- Repeat until the task is complete
Building Your First Agent with Tools
Let's create a practical agent that can perform web searches, calculations, and interact with custom APIs. This example demonstrates the core concepts of agent development.
from langchain.agents import initialize_agent, Tool
from langchain.agents import AgentType
from langchain.llms import OpenAI
from langchain.utilities import GoogleSearchAPIWrapper
from langchain.tools import BaseTool
from typing import Optional, Type
from pydantic import BaseModel, Field
import requests
import json
# Custom tool for mathematical calculations
class CalculatorInput(BaseModel):
expression: str = Field(description="Mathematical expression to evaluate")
class CalculatorTool(BaseTool):
name = "calculator"
description = "Useful for performing mathematical calculations"
args_schema: Type[BaseModel] = CalculatorInput
def _run(self, expression: str) -> str:
try:
# Safe evaluation of mathematical expressions
import ast
import operator as op
operators = {
ast.Add: op.add, ast.Sub: op.sub,
ast.Mult: op.mul, ast.Div: op.truediv,
ast.Pow: op.pow, ast.USub: op.neg
}
def eval_expr(node):
if isinstance(node, ast.Num):
return node.n
elif isinstance(node, ast.BinOp):
return operators[type(node.op)](
eval_expr(node.left),
eval_expr(node.right)
)
elif isinstance(node, ast.UnaryOp):
return operators[type(node.op)](eval_expr(node.operand))
else:
raise TypeError(f"Unsupported type {type(node)}")
tree = ast.parse(expression, mode='eval')
result = eval_expr(tree.body)
return f"The result of {expression} is {result}"
except Exception as e:
return f"Error calculating {expression}: {str(e)}"
async def _arun(self, expression: str) -> str:
return self._run(expression)
# Custom API tool
class WeatherAPITool(BaseTool):
name = "weather_api"
description = "Get current weather information for a city"
def _run(self, city: str) -> str:
try:
# Mock API call - replace with actual API
api_key = "your_api_key"
url = f"http://api.openweathermap.org/data/2.5/weather?q={city}&appid={api_key}"
# For demonstration, return mock data
mock_weather = {
"temperature": "22°C",
"condition": "Partly cloudy",
"humidity": "65%",
"wind": "10 km/h"
}
return f"Weather in {city}: {json.dumps(mock_weather, indent=2)}"
except Exception as e:
return f"Error fetching weather for {city}: {str(e)}"
async def _arun(self, city: str) -> str:
return self._run(city)
# Initialize tools
search = GoogleSearchAPIWrapper()
calculator = CalculatorTool()
weather = WeatherAPITool()
tools = [
Tool(
name="google_search",
description="Search Google for recent information",
func=search.run
),
calculator,
weather
]
# Initialize the agent
llm = OpenAI(temperature=0)
agent = initialize_agent(
tools,
llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=True,
handle_parsing_errors=True
)
# Example usage
response = agent.run(
"What's the weather in San Francisco? Also, if the temperature "
"is 22°C, what is that in Fahrenheit? Search for the formula if needed."
)
Advanced Agent Executors
Agent executors are the engines that drive agent behavior. They manage the agent's decision-making process, tool execution, and error handling. Let's build a custom executor with advanced features.
from langchain.agents import AgentExecutor
from langchain.agents.agent import Agent
from langchain.callbacks.base import BaseCallbackHandler
from typing import Dict, List, Any
import time
class AdvancedAgentExecutor(AgentExecutor):
"""Enhanced agent executor with monitoring and fallback strategies"""
def __init__(self, agent: Agent, tools: List, **kwargs):
super().__init__(agent=agent, tools=tools, **kwargs)
self.execution_stats = {
"total_steps": 0,
"tool_calls": {},
"errors": [],
"execution_time": 0
}
self.fallback_strategies = []
def add_fallback_strategy(self, strategy):
"""Add a fallback strategy for error handling"""
self.fallback_strategies.append(strategy)
def _call(self, inputs: Dict[str, str]) -> Dict[str, Any]:
"""Execute agent with enhanced monitoring"""
start_time = time.time()
try:
# Track execution
self.execution_stats["total_steps"] += 1
# Execute with timeout
result = self._execute_with_timeout(inputs)
# Update stats
self.execution_stats["execution_time"] = time.time() - start_time
return result
except Exception as e:
# Log error
self.execution_stats["errors"].append({
"error": str(e),
"timestamp": time.time(),
"inputs": inputs
})
# Try fallback strategies
for strategy in self.fallback_strategies:
try:
return strategy(inputs, e)
except:
continue
# If all fallbacks fail, raise original error
raise e
def _execute_with_timeout(self, inputs: Dict[str, str], timeout: int = 60):
"""Execute with timeout protection"""
import signal
def timeout_handler(signum, frame):
raise TimeoutError("Agent execution timed out")
# Set timeout
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(timeout)
try:
result = super()._call(inputs)
signal.alarm(0) # Cancel timeout
return result
except TimeoutError:
signal.alarm(0)
raise
def get_execution_report(self) -> Dict[str, Any]:
"""Generate execution statistics report"""
return {
"total_steps": self.execution_stats["total_steps"],
"average_execution_time": self.execution_stats["execution_time"] / max(self.execution_stats["total_steps"], 1),
"tool_usage": self.execution_stats["tool_calls"],
"error_rate": len(self.execution_stats["errors"]) / max(self.execution_stats["total_steps"], 1),
"errors": self.execution_stats["errors"]
}
# Fallback strategy example
def simple_fallback(inputs: Dict[str, str], error: Exception) -> Dict[str, Any]:
"""Simple fallback that returns a helpful error message"""
return {
"output": f"I encountered an error: {str(error)}. "
f"Let me try a simpler approach to help you with: {inputs.get('input', 'your request')}",
"intermediate_steps": []
}
# Usage
executor = AdvancedAgentExecutor(agent=agent, tools=tools)
executor.add_fallback_strategy(simple_fallback)
Error Handling and Fallback Strategies
Robust error handling is crucial for building reliable autonomous AI systems. Here's a comprehensive approach to handling various failure scenarios:
from enum import Enum
from typing import Optional, Callable
import logging
class ErrorType(Enum):
TOOL_ERROR = "tool_error"
PARSING_ERROR = "parsing_error"
LLM_ERROR = "llm_error"
TIMEOUT_ERROR = "timeout_error"
VALIDATION_ERROR = "validation_error"
class ErrorHandler:
"""Sophisticated error handling for agents"""
def __init__(self):
self.handlers = {}
self.logger = logging.getLogger(__name__)
self.retry_config = {
"max_retries": 3,
"backoff_factor": 2,
"initial_delay": 1
}
def register_handler(self, error_type: ErrorType, handler: Callable):
"""Register a handler for specific error type"""
self.handlers[error_type] = handler
def handle_error(self, error: Exception, context: Dict[str, Any]) -> Optional[Any]:
"""Handle error with appropriate strategy"""
error_type = self._classify_error(error)
# Log error details
self.logger.error(f"Error type: {error_type}, Message: {str(error)}")
# Try specific handler
if error_type in self.handlers:
return self.handlers[error_type](error, context)
# Default handling
return self._default_handler(error, context)
def _classify_error(self, error: Exception) -> ErrorType:
"""Classify error type for appropriate handling"""
error_str = str(error).lower()
if "timeout" in error_str:
return ErrorType.TIMEOUT_ERROR
elif "parsing" in error_str or "format" in error_str:
return ErrorType.PARSING_ERROR
elif "tool" in error_str:
return ErrorType.TOOL_ERROR
elif "validation" in error_str:
return ErrorType.VALIDATION_ERROR
else:
return ErrorType.LLM_ERROR
def _default_handler(self, error: Exception, context: Dict[str, Any]) -> Dict[str, Any]:
"""Default error handling strategy"""
return {
"success": False,
"error": str(error),
"suggestion": "Try rephrasing your request or breaking it into smaller steps.",
"context": context
}
def with_retry(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function with exponential backoff retry"""
import time
for attempt in range(self.retry_config["max_retries"]):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == self.retry_config["max_retries"] - 1:
raise e
delay = self.retry_config["initial_delay"] * (
self.retry_config["backoff_factor"] ** attempt
)
self.logger.warning(f"Attempt {attempt + 1} failed, retrying in {delay}s")
time.sleep(delay)
# Specific error handlers
def handle_tool_error(error: Exception, context: Dict[str, Any]) -> Dict[str, Any]:
"""Handle tool-specific errors"""
tool_name = context.get("tool_name", "unknown")
# Try alternative tool if available
alternative_tools = context.get("alternative_tools", [])
if alternative_tools:
return {
"action": "use_alternative",
"alternative": alternative_tools[0],
"reason": f"Tool {tool_name} failed: {str(error)}"
}
# Provide guidance
return {
"action": "skip_tool",
"message": f"The {tool_name} tool is temporarily unavailable. "
f"I'll proceed without it and find an alternative approach."
}
def handle_parsing_error(error: Exception, context: Dict[str, Any]) -> Dict[str, Any]:
"""Handle parsing errors with reformatting"""
return {
"action": "reformat",
"instruction": "Let me reformulate that response in a clearer format.",
"retry": True
}
# Setup error handling
error_handler = ErrorHandler()
error_handler.register_handler(ErrorType.TOOL_ERROR, handle_tool_error)
error_handler.register_handler(ErrorType.PARSING_ERROR, handle_parsing_error)
Agent Memory and Context Management
Memory is what transforms a simple agent into an intelligent system that can learn and adapt. Let's implement sophisticated memory management for our agents:
from langchain.memory import ConversationSummaryBufferMemory
from langchain.schema import BaseMemory
from typing import Dict, List, Any
import json
from datetime import datetime
class EnhancedAgentMemory(BaseMemory):
"""Advanced memory system for agents with multiple memory types"""
def __init__(self, llm):
self.llm = llm
self.short_term_memory = [] # Recent interactions
self.long_term_memory = {} # Persistent knowledge
self.working_memory = {} # Current task context
self.episodic_memory = [] # Specific experiences
self.memory_limit = 1000 # Token limit for short-term
def add_to_short_term(self, interaction: Dict[str, Any]):
"""Add interaction to short-term memory with pruning"""
self.short_term_memory.append({
"timestamp": datetime.now().isoformat(),
"content": interaction,
"importance": self._calculate_importance(interaction)
})
# Prune if exceeding limit
self._prune_short_term_memory()
def add_to_long_term(self, key: str, value: Any, metadata: Dict = None):
"""Store important information in long-term memory"""
self.long_term_memory[key] = {
"value": value,
"metadata": metadata or {},
"created_at": datetime.now().isoformat(),
"access_count": 0
}
def add_episodic(self, episode: Dict[str, Any]):
"""Store complete problem-solving episodes"""
self.episodic_memory.append({
"episode": episode,
"timestamp": datetime.now().isoformat(),
"success": episode.get("success", True),
"learnings": self._extract_learnings(episode)
})
def update_working_memory(self, key: str, value: Any):
"""Update current task context"""
self.working_memory[key] = value
def retrieve_relevant_memories(self, query: str, k: int = 5) -> List[Dict]:
"""Retrieve most relevant memories for current context"""
relevant_memories = []
# Search across all memory types
all_memories = (
self._search_short_term(query) +
self._search_long_term(query) +
self._search_episodic(query)
)
# Rank by relevance
ranked_memories = sorted(
all_memories,
key=lambda x: x.get("relevance_score", 0),
reverse=True
)
return ranked_memories[:k]
def _calculate_importance(self, interaction: Dict[str, Any]) -> float:
"""Calculate importance score for memory prioritization"""
# Factors: recency, frequency, emotional valence, task relevance
importance = 0.5 # Base score
# Check for important markers
content = str(interaction).lower()
if any(marker in content for marker in ["important", "remember", "critical"]):
importance += 0.3
# Check for learning opportunities
if "error" in content or "learned" in content:
importance += 0.2
return min(importance, 1.0)
def _prune_short_term_memory(self):
"""Intelligently prune short-term memory"""
if len(self.short_term_memory) > self.memory_limit:
# Sort by importance and recency
self.short_term_memory.sort(
key=lambda x: (x["importance"], x["timestamp"]),
reverse=True
)
# Keep most important/recent
self.short_term_memory = self.short_term_memory[:self.memory_limit // 2]
def _extract_learnings(self, episode: Dict[str, Any]) -> List[str]:
"""Extract key learnings from episodes"""
prompt = f"""
Analyze this problem-solving episode and extract key learnings:
Episode: {json.dumps(episode, indent=2)}
List 3-5 key learnings that would help in similar future tasks:
"""
response = self.llm.predict(prompt)
return response.split("\n")
def get_context_summary(self) -> str:
"""Generate a summary of current context from all memory types"""
summary_parts = []
# Working memory summary
if self.working_memory:
summary_parts.append(f"Current task context: {json.dumps(self.working_memory)}")
# Recent interactions
recent = self.short_term_memory[-5:] if self.short_term_memory else []
if recent:
summary_parts.append(f"Recent interactions: {len(recent)} items")
# Relevant long-term knowledge
if self.long_term_memory:
summary_parts.append(f"Known facts: {len(self.long_term_memory)} items")
return "\n".join(summary_parts)
# Integration with agent
class MemoryAwareAgent:
"""Agent with sophisticated memory capabilities"""
def __init__(self, llm, tools):
self.llm = llm
self.tools = tools
self.memory = EnhancedAgentMemory(llm)
self.agent = self._initialize_agent()
def _initialize_agent(self):
"""Initialize agent with memory context"""
from langchain.agents import initialize_agent
return initialize_agent(
self.tools,
self.llm,
agent=AgentType.CONVERSATIONAL_REACT_DESCRIPTION,
memory=ConversationSummaryBufferMemory(
llm=self.llm,
max_token_limit=2048
),
verbose=True
)
def run(self, task: str) -> str:
"""Execute task with memory enhancement"""
# Retrieve relevant memories
relevant_memories = self.memory.retrieve_relevant_memories(task)
# Enhance prompt with memory context
enhanced_prompt = self._enhance_with_memory(task, relevant_memories)
# Execute task
result = self.agent.run(enhanced_prompt)
# Update memories
self.memory.add_to_short_term({
"task": task,
"result": result,
"tools_used": self._extract_tools_used(result)
})
return result
def _enhance_with_memory(self, task: str, memories: List[Dict]) -> str:
"""Enhance task prompt with relevant memories"""
if not memories:
return task
memory_context = "\n".join([
f"- {mem.get('content', '')}" for mem in memories[:3]
])
return f"""
Task: {task}
Relevant context from memory:
{memory_context}
Use this context to inform your approach.
"""
def _extract_tools_used(self, result: str) -> List[str]:
"""Extract which tools were used in the execution"""
# Simple extraction - in practice, use agent callbacks
tools_used = []
for tool in self.tools:
if tool.name in result:
tools_used.append(tool.name)
return tools_used
Multi-Agent Systems: Orchestrating Collaboration
Multi-agent systems represent the next frontier in autonomous AI. By coordinating multiple specialized agents, we can tackle complex problems that require diverse expertise:
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional
import asyncio
from concurrent.futures import ThreadPoolExecutor
class BaseAgent(ABC):
"""Abstract base class for specialized agents"""
def __init__(self, name: str, llm, tools: List):
self.name = name
self.llm = llm
self.tools = tools
self.expertise = []
@abstractmethod
def can_handle(self, task: str) -> float:
"""Return confidence score (0-1) for handling the task"""
pass
@abstractmethod
def execute(self, task: str, context: Dict = None) -> Dict[str, Any]:
"""Execute the task and return results"""
pass
class ResearchAgent(BaseAgent):
"""Agent specialized in research and information gathering"""
def __init__(self, llm, search_tools):
super().__init__("Research Agent", llm, search_tools)
self.expertise = ["research", "search", "information gathering", "fact checking"]
def can_handle(self, task: str) -> float:
task_lower = task.lower()
keywords = ["research", "find", "search", "look up", "investigate", "verify"]
confidence = sum(1 for keyword in keywords if keyword in task_lower) / len(keywords)
return min(confidence * 2, 1.0) # Scale up but cap at 1.0
def execute(self, task: str, context: Dict = None) -> Dict[str, Any]:
# Execute research task
from langchain.agents import initialize_agent, AgentType
agent = initialize_agent(
self.tools,
self.llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=True
)
result = agent.run(task)
return {
"agent": self.name,
"task": task,
"result": result,
"confidence": self.can_handle(task),
"tools_used": [tool.name for tool in self.tools]
}
class AnalysisAgent(BaseAgent):
"""Agent specialized in data analysis and reasoning"""
def __init__(self, llm, analysis_tools):
super().__init__("Analysis Agent", llm, analysis_tools)
self.expertise = ["analysis", "reasoning", "calculation", "data processing"]
def can_handle(self, task: str) -> float:
task_lower = task.lower()
keywords = ["analyze", "calculate", "compute", "reason", "evaluate", "assess"]
confidence = sum(1 for keyword in keywords if keyword in task_lower) / len(keywords)
return min(confidence * 2, 1.0)
def execute(self, task: str, context: Dict = None) -> Dict[str, Any]:
# Execute analysis task
result = f"Analysis complete for: {task}"
return {
"agent": self.name,
"task": task,
"result": result,
"confidence": self.can_handle(task)
}
class MultiAgentOrchestrator:
"""Orchestrates multiple agents to solve complex tasks"""
def __init__(self, agents: List[BaseAgent]):
self.agents = agents
self.executor = ThreadPoolExecutor(max_workers=len(agents))
self.task_history = []
def decompose_task(self, complex_task: str) -> List[Dict[str, Any]]:
"""Decompose complex task into subtasks"""
# Use LLM to decompose task
decomposition_prompt = f"""
Decompose this complex task into smaller, independent subtasks:
Task: {complex_task}
Return a list of subtasks with their type (research, analysis, execution).
Format each subtask as: "TYPE: description"
"""
# In practice, use LLM to generate this
# For demo, return mock decomposition
return [
{"type": "research", "description": "Gather relevant information"},
{"type": "analysis", "description": "Analyze the collected data"},
{"type": "execution", "description": "Execute the final solution"}
]
def assign_agent(self, subtask: Dict[str, Any]) -> Optional[BaseAgent]:
"""Assign best agent for subtask"""
task_description = subtask["description"]
# Get confidence scores from all agents
agent_scores = [
(agent, agent.can_handle(task_description))
for agent in self.agents
]
# Select agent with highest confidence
best_agent, best_score = max(agent_scores, key=lambda x: x[1])
if best_score > 0.3: # Minimum confidence threshold
return best_agent
return None
async def execute_parallel(self, tasks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Execute multiple tasks in parallel"""
async def run_task(task):
agent = self.assign_agent(task)
if agent:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
self.executor,
agent.execute,
task["description"]
)
return result
return {"error": "No suitable agent found", "task": task}
results = await asyncio.gather(*[run_task(task) for task in tasks])
return results
def execute_sequential(self, tasks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Execute tasks sequentially with context passing"""
results = []
context = {}
for task in tasks:
agent = self.assign_agent(task)
if agent:
# Pass context from previous tasks
result = agent.execute(task["description"], context)
results.append(result)
# Update context with result
context[task["type"]] = result["result"]
else:
results.append({"error": "No suitable agent found", "task": task})
return results
def solve_complex_task(self, task: str, parallel: bool = True) -> Dict[str, Any]:
"""Solve complex task using multiple agents"""
# Decompose task
subtasks = self.decompose_task(task)
# Execute subtasks
if parallel and self._can_parallelize(subtasks):
results = asyncio.run(self.execute_parallel(subtasks))
else:
results = self.execute_sequential(subtasks)
# Synthesize results
synthesis = self._synthesize_results(results)
# Record in history
self.task_history.append({
"task": task,
"subtasks": subtasks,
"results": results,
"synthesis": synthesis
})
return synthesis
def _can_parallelize(self, subtasks: List[Dict[str, Any]]) -> bool:
"""Determine if subtasks can be executed in parallel"""
# Check for dependencies
# For demo, assume research must come before analysis
types = [task["type"] for task in subtasks]
return not ("research" in types and "analysis" in types)
def _synthesize_results(self, results: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Synthesize results from multiple agents"""
successful_results = [r for r in results if "error" not in r]
return {
"success": len(successful_results) == len(results),
"total_subtasks": len(results),
"successful_subtasks": len(successful_results),
"combined_result": "\n".join([
f"{r['agent']}: {r['result']}"
for r in successful_results
]),
"individual_results": results
}
# Example usage
def create_multi_agent_system():
"""Create a multi-agent system with specialized agents"""
from langchain.llms import OpenAI
from langchain.tools import DuckDuckGoSearchRun, PythonREPLTool
llm = OpenAI(temperature=0)
# Create specialized agents
research_agent = ResearchAgent(
llm=llm,
search_tools=[DuckDuckGoSearchRun()]
)
analysis_agent = AnalysisAgent(
llm=llm,
analysis_tools=[PythonREPLTool()]
)
# Create orchestrator
orchestrator = MultiAgentOrchestrator([research_agent, analysis_agent])
return orchestrator
# Usage example
orchestrator = create_multi_agent_system()
result = orchestrator.solve_complex_task(
"Research the latest trends in renewable energy and analyze their potential impact on global carbon emissions"
)
Debugging Techniques for Agents
Debugging autonomous agents requires specialized techniques due to their non-deterministic nature and complex decision-making processes:
from langchain.callbacks.base import BaseCallbackHandler
from typing import Dict, Any, List, Union
import logging
import json
from datetime import datetime
class AgentDebugger(BaseCallbackHandler):
"""Comprehensive debugging system for agents"""
def __init__(self, log_file: str = "agent_debug.log"):
self.log_file = log_file
self.logger = logging.getLogger("AgentDebugger")
self.execution_trace = []
self.current_execution = {}
self.breakpoints = []
self.watch_expressions = []
# Setup logging
handler = logging.FileHandler(log_file)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.DEBUG)
def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str], **kwargs):
"""Log when LLM is called"""
self.current_execution["llm_start"] = {
"timestamp": datetime.now().isoformat(),
"prompts": prompts,
"model": serialized.get("name", "unknown")
}
self.logger.debug(f"LLM Start: {json.dumps(prompts, indent=2)}")
# Check breakpoints
self._check_breakpoints("llm_start", {"prompts": prompts})
def on_llm_end(self, response, **kwargs):
"""Log LLM response"""
self.current_execution["llm_response"] = {
"timestamp": datetime.now().isoformat(),
"response": str(response)
}
self.logger.debug(f"LLM Response: {response}")
def on_tool_start(self, serialized: Dict[str, Any], input_str: str, **kwargs):
"""Log tool execution start"""
tool_name = serialized.get("name", "unknown")
self.current_execution["tool_start"] = {
"timestamp": datetime.now().isoformat(),
"tool": tool_name,
"input": input_str
}
self.logger.info(f"Tool Start - {tool_name}: {input_str}")
# Check breakpoints
self._check_breakpoints("tool_start", {"tool": tool_name, "input": input_str})
def on_tool_end(self, output: str, **kwargs):
"""Log tool execution result"""
self.current_execution["tool_end"] = {
"timestamp": datetime.now().isoformat(),
"output": output
}
self.logger.info(f"Tool Output: {output}")
# Save execution trace
self.execution_trace.append(dict(self.current_execution))
self.current_execution = {}
def on_agent_action(self, action, **kwargs):
"""Log agent decisions"""
self.current_execution["agent_action"] = {
"timestamp": datetime.now().isoformat(),
"action": str(action.tool),
"action_input": str(action.tool_input),
"reasoning": str(action.log)
}
self.logger.info(f"Agent Action: {action.tool} with input: {action.tool_input}")
self.logger.debug(f"Reasoning: {action.log}")
# Evaluate watch expressions
self._evaluate_watches({"action": action})
def add_breakpoint(self, condition: str, callback = None):
"""Add conditional breakpoint"""
self.breakpoints.append({
"condition": condition,
"callback": callback or self._default_breakpoint_handler
})
def add_watch(self, expression: str):
"""Add expression to watch during execution"""
self.watch_expressions.append(expression)
def _check_breakpoints(self, event_type: str, context: Dict[str, Any]):
"""Check if any breakpoints should trigger"""
for bp in self.breakpoints:
try:
# Evaluate condition in context
if eval(bp["condition"], {"event": event_type, **context}):
bp["callback"](event_type, context, self.execution_trace)
except Exception as e:
self.logger.error(f"Breakpoint error: {e}")
def _evaluate_watches(self, context: Dict[str, Any]):
"""Evaluate watch expressions"""
for expr in self.watch_expressions:
try:
value = eval(expr, context)
self.logger.debug(f"Watch - {expr}: {value}")
except Exception as e:
self.logger.error(f"Watch error for '{expr}': {e}")
def _default_breakpoint_handler(self, event: str, context: Dict, trace: List):
"""Default breakpoint handler - logs detailed state"""
self.logger.warning(f"BREAKPOINT HIT at {event}")
self.logger.warning(f"Context: {json.dumps(context, indent=2)}")
self.logger.warning(f"Recent trace: {json.dumps(trace[-3:], indent=2)}")
def get_execution_summary(self) -> Dict[str, Any]:
"""Get summary of agent execution"""
tool_usage = {}
total_llm_calls = 0
errors = []
for execution in self.execution_trace:
# Count LLM calls
if "llm_start" in execution:
total_llm_calls += 1
# Track tool usage
if "tool_start" in execution:
tool = execution["tool_start"]["tool"]
tool_usage[tool] = tool_usage.get(tool, 0) + 1
# Collect errors
if "error" in execution:
errors.append(execution["error"])
return {
"total_executions": len(self.execution_trace),
"llm_calls": total_llm_calls,
"tool_usage": tool_usage,
"errors": errors,
"trace_file": self.log_file
}
def replay_execution(self, step: int = None) -> Dict[str, Any]:
"""Replay execution up to specific step"""
if step is None:
return self.execution_trace
return self.execution_trace[:step + 1]
def analyze_decision_path(self) -> List[Dict[str, Any]]:
"""Analyze the decision-making path of the agent"""
decisions = []
for i, execution in enumerate(self.execution_trace):
if "agent_action" in execution:
decision = {
"step": i,
"action": execution["agent_action"]["action"],
"reasoning": execution["agent_action"]["reasoning"],
"input": execution["agent_action"]["action_input"],
"outcome": self._get_outcome(i)
}
decisions.append(decision)
return decisions
def _get_outcome(self, step: int) -> Optional[str]:
"""Get outcome of a specific step"""
if step + 1 < len(self.execution_trace):
next_execution = self.execution_trace[step + 1]
if "tool_end" in next_execution:
return next_execution["tool_end"]["output"]
return None
# Visual debugging helper
class AgentVisualDebugger:
"""Generate visual representations of agent execution"""
def __init__(self, debugger: AgentDebugger):
self.debugger = debugger
def generate_execution_graph(self) -> str:
"""Generate a mermaid graph of execution flow"""
trace = self.debugger.execution_trace
graph_lines = ["graph TD"]
for i, execution in enumerate(trace):
if "agent_action" in execution:
action = execution["agent_action"]
node_id = f"action_{i}"
label = f"{action['action']}\\n{action['action_input'][:30]}..."
graph_lines.append(f" {node_id}[{label}]")
if i > 0:
graph_lines.append(f" action_{i-1} --> {node_id}")
return "\n".join(graph_lines)
def generate_timeline(self) -> List[Dict[str, Any]]:
"""Generate execution timeline"""
timeline = []
for execution in self.debugger.execution_trace:
for event_type, event_data in execution.items():
if isinstance(event_data, dict) and "timestamp" in event_data:
timeline.append({
"timestamp": event_data["timestamp"],
"event": event_type,
"details": event_data
})
return sorted(timeline, key=lambda x: x["timestamp"])
# Usage example
def debug_agent_execution():
"""Example of debugging an agent"""
from langchain.agents import initialize_agent, AgentType
from langchain.llms import OpenAI
# Create debugger
debugger = AgentDebugger("agent_debug.log")
# Add breakpoints
debugger.add_breakpoint(
"event == 'tool_start' and context['tool'] == 'calculator'",
lambda e, c, t: print(f"Calculator called with: {c['input']}")
)
# Add watches
debugger.add_watch("action.tool if 'action' in locals() else None")
# Create agent with debugger
llm = OpenAI(temperature=0, callbacks=[debugger])
agent = initialize_agent(
tools,
llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
callbacks=[debugger],
verbose=True
)
# Execute with debugging
result = agent.run("What is 25 * 4 + 10?")
# Analyze execution
summary = debugger.get_execution_summary()
decisions = debugger.analyze_decision_path()
print(f"Execution Summary: {json.dumps(summary, indent=2)}")
print(f"Decision Path: {json.dumps(decisions, indent=2)}")
# Generate visual debug info
visual_debugger = AgentVisualDebugger(debugger)
graph = visual_debugger.generate_execution_graph()
timeline = visual_debugger.generate_timeline()
print(f"Execution Graph:\n{graph}")
print(f"Timeline: {json.dumps(timeline, indent=2)}")
return debugger
Performance and Cost Optimization
Building efficient agents requires careful consideration of performance and cost. Here are strategies for optimizing your autonomous AI systems:
from functools import lru_cache
import time
from typing import Dict, Any, Optional
import hashlib
class AgentOptimizer:
"""Optimization strategies for agent performance and cost"""
def __init__(self):
self.cache = {}
self.metrics = {
"llm_calls": 0,
"tool_calls": 0,
"cache_hits": 0,
"total_tokens": 0,
"total_cost": 0.0
}
self.cost_per_1k_tokens = {
"gpt-4": {"input": 0.03, "output": 0.06},
"gpt-3.5-turbo": {"input": 0.001, "output": 0.002}
}
def cache_response(self, key: str, response: Any, ttl: int = 3600):
"""Cache responses with TTL"""
self.cache[key] = {
"response": response,
"timestamp": time.time(),
"ttl": ttl
}
def get_cached_response(self, key: str) -> Optional[Any]:
"""Retrieve cached response if valid"""
if key in self.cache:
entry = self.cache[key]
if time.time() - entry["timestamp"] < entry["ttl"]:
self.metrics["cache_hits"] += 1
return entry["response"]
else:
del self.cache[key]
return None
def generate_cache_key(self, prompt: str, tool: str = None) -> str:
"""Generate cache key from prompt and tool"""
content = f"{prompt}:{tool}" if tool else prompt
return hashlib.md5(content.encode()).hexdigest()
def estimate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
"""Estimate cost for LLM call"""
if model in self.cost_per_1k_tokens:
costs = self.cost_per_1k_tokens[model]
input_cost = (input_tokens / 1000) * costs["input"]
output_cost = (output_tokens / 1000) * costs["output"]
return input_cost + output_cost
return 0.0
def optimize_prompt(self, prompt: str, max_length: int = 1000) -> str:
"""Optimize prompt length while preserving meaning"""
if len(prompt) <= max_length:
return prompt
# Smart truncation preserving structure
lines = prompt.split('\n')
optimized_lines = []
current_length = 0
# Prioritize keeping instructions and questions
important_markers = ["?", "instruction:", "task:", "question:"]
# First pass: keep important lines
for line in lines:
if any(marker in line.lower() for marker in important_markers):
optimized_lines.append(line)
current_length += len(line)
# Second pass: add remaining lines if space
for line in lines:
if line not in optimized_lines and current_length + len(line) < max_length:
optimized_lines.append(line)
current_length += len(line)
return '\n'.join(optimized_lines)
def batch_operations(self, operations: List[Dict[str, Any]]) -> List[Any]:
"""Batch similar operations for efficiency"""
# Group by operation type
grouped = {}
for op in operations:
op_type = op.get("type", "default")
if op_type not in grouped:
grouped[op_type] = []
grouped[op_type].append(op)
results = []
for op_type, ops in grouped.items():
if op_type == "search":
# Batch search operations
results.extend(self._batch_search(ops))
elif op_type == "calculation":
# Batch calculations
results.extend(self._batch_calculate(ops))
else:
# Process individually
results.extend([self._process_single(op) for op in ops])
return results
def _batch_search(self, searches: List[Dict[str, Any]]) -> List[Any]:
"""Batch multiple search operations"""
# Combine queries when possible
combined_query = " OR ".join([s["query"] for s in searches])
# Execute single search and parse results
# Implementation depends on search tool
return [f"Result for: {s['query']}" for s in searches]
def _batch_calculate(self, calculations: List[Dict[str, Any]]) -> List[Any]:
"""Batch mathematical calculations"""
results = []
for calc in calculations:
try:
result = eval(calc["expression"])
results.append(result)
except:
results.append(None)
return results
def _process_single(self, operation: Dict[str, Any]) -> Any:
"""Process single operation"""
return f"Processed: {operation}"
class OptimizedAgent:
"""Agent with built-in optimization"""
def __init__(self, llm, tools, optimizer: AgentOptimizer):
self.llm = llm
self.tools = tools
self.optimizer = optimizer
self.pending_operations = []
def run(self, task: str) -> str:
"""Run task with optimizations"""
# Check cache first
cache_key = self.optimizer.generate_cache_key(task)
cached_result = self.optimizer.get_cached_response(cache_key)
if cached_result:
return cached_result
# Optimize prompt
optimized_task = self.optimizer.optimize_prompt(task)
# Execute with monitoring
start_time = time.time()
result = self._execute_with_batching(optimized_task)
execution_time = time.time() - start_time
# Cache result
self.optimizer.cache_response(cache_key, result)
# Update metrics
self.optimizer.metrics["execution_time"] = execution_time
return result
def _execute_with_batching(self, task: str) -> str:
"""Execute task with operation batching"""
# Collect operations that can be batched
self.pending_operations = []
# Execute task (operations collected via callbacks)
result = self._execute_task(task)
# Batch execute pending operations
if self.pending_operations:
batch_results = self.optimizer.batch_operations(self.pending_operations)
# Integrate batch results
result = self._integrate_batch_results(result, batch_results)
return result
def _execute_task(self, task: str) -> str:
"""Execute the actual task"""
# Agent execution logic
return f"Completed: {task}"
def _integrate_batch_results(self, result: str, batch_results: List[Any]) -> str:
"""Integrate batched operation results"""
# Combine results
return f"{result}\nBatch results: {batch_results}"
# Cost-aware execution strategies
class CostAwareExecutor:
"""Execute agents with cost constraints"""
def __init__(self, budget: float = 10.0):
self.budget = budget
self.spent = 0.0
self.cost_history = []
def execute_with_budget(self, agent, task: str, model_preference: Dict[str, float]):
"""Execute task within budget constraints"""
# Estimate cost
estimated_cost = self._estimate_task_cost(task, model_preference)
if self.spent + estimated_cost > self.budget:
# Try cheaper alternatives
cheaper_model = self._find_cheaper_alternative(model_preference)
if cheaper_model:
return self._execute_with_model(agent, task, cheaper_model)
else:
raise ValueError(f"Budget exceeded. Spent: ${self.spent:.2f}, Budget: ${self.budget:.2f}")
# Execute with preferred model
result = self._execute_with_model(agent, task, list(model_preference.keys())[0])
return result
def _estimate_task_cost(self, task: str, model_preference: Dict[str, float]) -> float:
"""Estimate cost based on task complexity"""
# Simple estimation based on task length and complexity
base_tokens = len(task.split()) * 1.5 # Rough token estimate
# Adjust for complexity markers
if any(word in task.lower() for word in ["analyze", "research", "comprehensive"]):
base_tokens *= 2
# Calculate cost for preferred model
model = list(model_preference.keys())[0]
cost_per_token = model_preference[model]
return (base_tokens / 1000) * cost_per_token
def _find_cheaper_alternative(self, model_preference: Dict[str, float]) -> Optional[str]:
"""Find cheaper model that can handle the task"""
sorted_models = sorted(model_preference.items(), key=lambda x: x[1])
for model, cost in sorted_models:
if self.spent + cost < self.budget:
return model
return None
def _execute_with_model(self, agent, task: str, model: str) -> str:
"""Execute task with specific model"""
# Execute and track cost
result = f"Executed {task} with {model}"
# Update spending
actual_cost = 0.1 # Mock cost
self.spent += actual_cost
self.cost_history.append({
"task": task,
"model": model,
"cost": actual_cost,
"timestamp": time.time()
})
return result
For more advanced component integration with your agents, check out our LangChain Components Guide. It covers how to build reusable components that can enhance your agent systems.
Best Practices and Common Pitfalls
When building autonomous AI systems with LangChain agents, following best practices can mean the difference between a reliable system and one that fails unpredictably:
Best Practices
-
Clear Tool Descriptions: Provide detailed, unambiguous descriptions for each tool. The agent's ability to select the right tool depends entirely on these descriptions.
-
Graceful Degradation: Always implement fallback strategies. When a tool fails or returns unexpected results, your agent should have alternative approaches.
-
Context Preservation: Maintain relevant context across agent interactions. Use memory systems to ensure agents can build upon previous interactions.
-
Cost Monitoring: Implement cost tracking from the start. Agent systems can quickly become expensive without proper monitoring.
-
Testing and Validation: Create comprehensive test suites that cover various scenarios, including edge cases and failure modes.
Common Pitfalls to Avoid
-
Over-reliance on Single Tools: Don't create agents that depend too heavily on one tool. Diversify capabilities for robustness.
-
Ignoring Rate Limits: Many APIs have rate limits. Implement proper throttling and queuing mechanisms.
-
Insufficient Error Context: When errors occur, ensure your agent captures enough context to understand and potentially recover from the failure.
-
Memory Leaks: Long-running agents can accumulate memory. Implement proper cleanup and memory management strategies.
-
Unclear Agent Boundaries: Define clear responsibilities for each agent in multi-agent systems to avoid overlap and confusion.
Conclusion
Building autonomous AI systems with LangChain agents opens up possibilities for creating intelligent applications that can reason, plan, and execute complex tasks. By mastering the ReAct pattern, implementing robust error handling, managing agent memory effectively, and optimizing for performance and cost, you can create agents that truly augment human capabilities.
The key to success lies in understanding that agents are not just about connecting LLMs to tools—they're about creating systems that can think, adapt, and learn. As you build your own agent systems, remember to start simple, test thoroughly, and gradually increase complexity as you gain confidence in your architecture.
Whether you're building a single agent for a specific task or orchestrating multiple agents for complex problem-solving, the principles and patterns covered in this guide will help you create more intelligent, reliable, and efficient autonomous AI systems.