Memory & State Tracking for AI Agents: Complete Guide
Memory & State Tracking for AI Agents: Production-Ready Implementation Guide
Section titled βMemory & State Tracking for AI Agents: Production-Ready Implementation GuideβA single agent processing 50,000 daily requests can burn an extra $47,000 per week by failing to manage context properly. The culprit? Storing full conversation history in every request instead of using memory banks and context caching. This guide provides production-ready implementations for agent memory management, state tracking, and context optimization that can reduce token costs by 60-80% while improving performance.
Why Memory Management Matters
Section titled βWhy Memory Management MattersβThe shift from simple chatbots to autonomous agents has fundamentally changed how we approach memory. Agents maintain multi-turn conversations, execute long-running tasks, and coordinate across sessionsβeach requiring sophisticated state tracking.
The Cost Reality
Section titled βThe Cost RealityβWithout proper memory management, costs scale exponentially:
| Scenario | Daily Requests | Avg Tokens/Request | Daily Cost (GPT-4o) | Monthly Cost |
|---|---|---|---|---|
| No optimization | 50,000 | 8,000 | $2,000 | $60,000 |
| With memory bank | 50,000 | 2,000 | $500 | $15,000 |
| With caching + bank | 50,000 | 800 | $200 | $6,000 |
Based on GPT-4o pricing: $5.00/$15.00 per 1M tokens (128K context)
Context Window Limitations
Section titled βContext Window LimitationsβEven with massive context windows, performance degrades as usage increases. Research from Google Cloud shows that while Gemini 1.5 Pro supports 2 million tokens with greater than 99% retrieval accuracy, models still experience βattention budgetβ depletionβevery token added reduces focus on other tokens Google Cloud Documentation.
Core Memory Architectures
Section titled βCore Memory Architecturesβ1. Memory Bank Pattern
Section titled β1. Memory Bank PatternβMemory banks store long-term information outside the context window, retrieving only relevant data when needed. This pattern is essential for cross-session continuity and cost control.
Vertex AI Agent Engine Memory Bank Example
Section titled βVertex AI Agent Engine Memory Bank ExampleβGoogle Cloudβs Memory Bank provides scope-based isolation with automatic expiration and revision tracking Google Cloud Documentation.
import requestsimport jsonfrom typing import List, Dict, Optional
class AgentMemoryManager: """Manages long-term memory for AI agents using Vertex AI Memory Bank."""
def __init__(self, project_id: str, location: str, agent_id: str): self.project_id = project_id self.location = location self.agent_id = agent_id self.base_url = f"https://{location}-aiplatform.googleapis.com/v1" self.session_id = None
def create_session(self, user_id: str) -> str: """Create a new session for a user.""" url = f"{self.base_url}/projects/{self.project_id}/locations/{self.location}/reasoningEngines/{self.agent_id}/sessions" payload = { "user_id": user_id }
try: response = requests.post(url, json=payload) response.raise_for_status() self.session_id = response.json()["name"].split("/")[-1] return self.session_id except requests.exceptions.RequestException as e: print(f"Error creating session: {e}") raise
def append_event(self, event_type: str, content: str) -> bool: """Append an event to the current session.""" if not self.session_id: raise ValueError("Session must be created first")
url = f"{self.base_url}/projects/{self.project_id}/locations/{self.location}/reasoningEngines/{self.agent_id}/sessions/{self.session_id}:appendEvent" payload = { "event": { "event_type": event_type, "content": content } }
try: response = requests.post(url, json=payload) response.raise_for_status() return True except requests.exceptions.RequestException as e: print(f"Error appending event: {e}") return False
def generate_memories(self, topics: Optional[List[str]] = None) -> Dict: """Generate memories from conversation history.""" if not self.session_id: raise ValueError("Session must be created first")
url = f"{self.base_url}/projects/{self.project_id}/locations/{self.location}/reasoningEngines/{self.agent_id}/memories:generate" payload = { "scope": { "agent_name": self.agent_id, "user": self.session_id } }
if topics: payload["memory_topics"] = topics
try: response = requests.post(url, json=payload) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: print(f"Error generating memories: {e}") raise
def retrieve_memories(self, query: str, top_k: int = 5) -> List[Dict]: """Retrieve relevant memories using similarity search.""" if not self.session_id: raise ValueError("Session must be created first")
url = f"{self.base_url}/projects/{self.project_id}/locations/{self.location}/reasoningEngines/{self.agent_id}/memories:retrieve" payload = { "scope": { "agent_name": self.agent_id, "user": self.session_id }, "query": query, "top_k": top_k }
try: response = requests.post(url, json=payload) response.raise_for_status() return response.json().get("memories", []) except requests.exceptions.RequestException as e: print(f"Error retrieving memories: {e}") return []
# Example usageif __name__ == "__main__": # Initialize memory manager memory_manager = AgentMemoryManager( project_id="your-project-id", location="us-central1", agent_id="my-agent" )
# Create session for user session_id = memory_manager.create_session("user-123") print(f"Created session: {session_id}")
# Add conversation events memory_manager.append_event("user_message", "I prefer Python for data analysis") memory_manager.append_event("agent_response", "I'll remember you prefer Python")
# Generate memories from conversation memories = memory_manager.generate_memories(topics=["programming_languages", "user_preferences"]) print(f"Generated memories: {json.dumps(memories, indent=2)}")
# Retrieve relevant memories relevant = memory_manager.retrieve_memories("What programming language does the user prefer?", top_k=3) print(f"Retrieved memories: {json.dumps(relevant, indent=2)}")interface MemoryBankConfig { projectId: string; location: string; agentId: string;}
interface MemoryEvent { eventType: string; content: string; timestamp: Date;}
interface Memory { id: string; content: string; topics: string[]; timestamp: Date; relevanceScore?: number;}
class VertexMemoryBank { private config: MemoryBankConfig; private sessionId: string | null = null;
constructor(config: MemoryBankConfig) { this.config = config; }
async createSession(userId: string): Promise<string> { const url = `https://${this.config.location}-aiplatform.googleapis.com/v1/projects/${this.config.projectId}/locations/${this.config.location}/reasoningEngines/${this.config.agentId}/sessions`;
const response = await fetch(url, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ user_id: userId }) });
if (!response.ok) { throw new Error(`Failed to create session: ${response.statusText}`); }
const data = await response.json(); this.sessionId = data.name.split('/').pop(); return this.sessionId!; }
async appendEvent(event: MemoryEvent): Promise<void> { if (!this.sessionId) throw new Error('Session not created');
const url = `https://${this.config.location}-aiplatform.googleapis.com/v1/projects/${this.config.projectId}/locations/${this.config.location}/reasoningEngines/${this.config.agentId}/sessions/${this.sessionId}:appendEvent`;
const response = await fetch(url, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ event: { event_type: event.eventType, content: event.content } }) });
if (!response.ok) { throw new Error(`Failed to append event: ${response.statusText}`); } }
async generateMemories(topics?: string[]): Promise<Memory[]> { if (!this.sessionId) throw new Error('Session not created');
const url = `https://${this.config.location}-aiplatform.googleapis.com/v1/projects/${this.config.projectId}/locations/${this.config.location}/reasoningEngines/${this.config.agentId}/memories:generate`;
const payload: any = { scope: { agent_name: this.config.agentId, user: this.sessionId } };
if (topics) payload.memory_topics = topics;
const response = await fetch(url, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(payload) });
if (!response.ok) { throw new Error(`Failed to generate memories: ${response.statusText}`); }
return response.json(); }
async retrieveMemories(query: string, topK: number = 5): Promise<Memory[]> { if (!this.sessionId) throw new Error('Session not created');
const url = `https://${this.config.location}-aiplatform.googleapis.com/v1/projects/${this.config.projectId}/locations/${this.config.location}/reasoningEngines/${this.config.agentId}/memories:retrieve`;
const response = await fetch(url, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ scope: { agent_name: this.config.agentId, user: this.sessionId }, query: query, top_k: topK }) });
if (!response.ok) { throw new Error(`Failed to retrieve memories: ${response.statusText}`); }
const data = await response.json(); return data.memories || []; }}
// Example usageasync function main() { const memoryBank = new VertexMemoryBank({ projectId: 'your-project-id', location: 'us-central1', agentId: 'my-agent' });
const sessionId = await memoryBank.createSession('user-123'); console.log(`Created session: ${sessionId}`);
await memoryBank.appendEvent({ eventType: 'user_message', content: 'I prefer Python for data analysis', timestamp: new Date() });
const memories = await memoryBank.generateMemories(['programming_languages', 'user_preferences']); console.log('Generated memories:', JSON.stringify(memories, null, 2));
const relevant = await memoryBank.retrieveMemories('What programming language does the user prefer?', 3); console.log('Retrieved memories:', JSON.stringify(relevant, null, 2));}
main().catch(console.error);Key Features:
- Scope-based isolation: Memories are accessible only to specific users and agents
- Automatic expiration: TTL (Time-To-Live) prevents stale data accumulation
- Revision tracking: All memory updates are versioned for audit trails
- Similarity search: Retrieve relevant memories using vector embeddings
2. Context Caching & Prefix Optimization
Section titled β2. Context Caching & Prefix OptimizationβContext caching stores common prefixes (like system prompts) once and reuses them across requests, dramatically reducing costs for repetitive operations.
vLLM PagedAttention Implementation
Section titled βvLLM PagedAttention ImplementationβvLLMβs PagedAttention and prefix caching can improve throughput by 2-4x while reducing memory waste from 60-80% to near-zero Princeton COS597.
from vllm import LLM, SamplingParamsfrom vllm.engine.arg_utils import EngineArgsimport timefrom typing import List, Dict
class EfficientAgentMemory: """Optimized memory management for AI agents using vLLM's PagedAttention."""
def __init__(self, model_name: str = "meta-llama/Llama-2-7b-chat-hf", gpu_memory_utilization: float = 0.95, max_model_len: int = 4096): """ Initialize vLLM engine with memory-efficient settings.
Args: model_name: HuggingFace model identifier gpu_memory_utilization: Fraction of GPU memory to use (0.0-1.0) max_model_len: Maximum sequence length to cache """ self.engine_args = EngineArgs( model=model_name, gpu_memory_utilization=gpu_memory_utilization, max_model_len=max_model_len, enable_prefix_caching=True, # Cache common prefixes quantization="awq" # Use AWQ for memory efficiency if supported ) self.llm = LLM(**self.engine_args.__dict__) self.conversation_cache = {}
def process_multi_turn_conversation(self, conversation_history: List[Dict[str, str]]) -> str: """ Process multi-turn conversation with context caching.
Args: conversation_history: List of {'role': 'user'|'assistant', 'content': str}
Returns: Generated response """ # Format conversation with proper tokens formatted_prompt = self._format_conversation(conversation_history)
# Use sampling params optimized for agent responses sampling_params = SamplingParams( temperature=0.7, top_p=0.95, max_tokens=512, repetition_penalty=1.1 )
try: start_time = time.time() outputs = self.llm.generate([formatted_prompt], sampling_params) generation_time = time.time() - start_time
# Log performance metrics generated_text = outputs[0].outputs[0].text prompt_tokens = len(outputs[0].prompt_token_ids) generated_tokens = len(outputs[0].outputs[0].token_ids)
print(f"Generation completed in {generation_time:.2f}s") print(f"Prompt tokens: {prompt_tokens}, Generated tokens: {generated_tokens}") print(f"Throughput: {(prompt_tokens + generated_tokens) / generation_time:.2f} tokens/sec")
return generated_text
except Exception as e: print(f"Error during generation: {e}") raise
def _format_conversation(self, conversation_history: List[Dict[str, str]]) -> str: """Format conversation history into model prompt.""" formatted = [] for turn in conversation_history: role = turn['role'] content = turn['content'] if role == 'user': formatted.append(f"<|user|>\n{content}<|end|>") elif role == 'assistant': formatted.append(f"<|assistant|>\n{content}<|end|>") formatted.append("<|assistant|>\n") return "\n".join(formatted)
def batch_process_agents(self, agent_prompts: List[str]) -> List[str]: """ Process multiple agent prompts simultaneously using continuous batching.
Args: agent_prompts: List of formatted prompts for different agents
Returns: List of generated responses """ sampling_params = SamplingParams( temperature=0.7, max_tokens=256 )
try: start_time = time.time() outputs = self.llm.generate(agent_prompts, sampling_params) batch_time = time.time() - start_time
total_tokens = sum( len(out.prompt_token_ids) + len(out.outputs[0].token_ids) for out in outputs )
print(f"Batch processed {len(agent_prompts)} requests in {batch_time:.2f}s") print(f"Total throughput: {total_tokens / batch_time:.2f} tokens/sec")
return [out.outputs[0].text for out in outputs]
except Exception as e: print(f"Error in batch processing: {e}") raise
# Example usageif __name__ == "__main__": # Initialize memory-efficient agent agent = EfficientAgentMemory( model_name="mistralai/Mistral-7B-Instruct-v0.1", gpu_memory_utilization=0.90, max_model_len=8192 )
# Single conversation example conversation = [ {"role": "user", "content": "What's the best way to manage agent memory?"}, {"role": "assistant", "content": "Use context caching and memory banks for long-term storage."}, {"role": "user", "content": "Can you elaborate on that?"} ]
response = agent.process_multi_turn_conversation(conversation) print(f"\nAgent response:\n{response}")
# Batch processing example batch_prompts = [ "<|user|>\nExplain PagedAttention<|end|><|assistant|>\n", "<|user|>\nWhat is context caching?<|end|><|assistant|>\n", "<|user|>\nHow does vLLM improve throughput?<|end|><|assistant|>\n" ]
batch_results = agent.batch_process_agents(batch_prompts) for i, result in enumerate(batch_results): print(f"\nBatch result {i+1}: {result}")interface SamplingParams { temperature: number; topP?: number; maxTokens: number; repetitionPenalty?: number;}
interface ConversationTurn { role: 'user' | 'assistant'; content: string;}
class vLLMAgentEngine { private modelEndpoint: string; private apiKey: string; private cache: Map<string, string> = new Map();
constructor(modelEndpoint: string, apiKey: string) { this.modelEndpoint = modelEndpoint; this.apiKey = apiKey; }
async generate( prompt: string, params: SamplingParams ): Promise<{ text: string; tokens: number; latency: number }> { const cacheKey = `${prompt}_${JSON.stringify(params)}`;
// Check cache first if (this.cache.has(cacheKey)) { console.log('Cache hit - returning cached response'); return { text: this.cache.get(cacheKey)!, tokens: 0, latency: 0 }; }
const startTime = Date.now();
const response = await fetch(`${this.modelEndpoint}/v1/completions`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${this.apiKey}` }, body: JSON.stringify({ prompt: prompt, max_tokens: params.maxTokens, temperature: params.temperature, top_p: params.topP, repetition_penalty: params.repetitionPenalty }) });
if (!response.ok) { throw new Error(`Generation failed: ${response.statusText}`); }
const data = await response.json(); const latency = Date.now() - startTime;
// Cache the response this.cache.set(cacheKey, data.choices[0].text);
return { text: data.choices[0].text, tokens: data.usage?.total_tokens || 0, latency: latency }; }
async batchGenerate( prompts: string[], params: SamplingParams ): Promise<Array<{ text: string; tokens: number; latency: number }>> { const startTime = Date.now();
const results = await Promise.all( prompts.map(prompt => this.generate(prompt, params)) );
const totalTime = Date.now() - startTime; const totalTokens = results.reduce((sum, r) => sum + r.tokens, 0);
console.log(`Batch processed ${prompts.length} requests in ${totalTime}ms`); console.log(`Throughput: ${(totalTokens / totalTime * 1000).toFixed(2)} tokens/sec`);
return results; }
// Simulate prefix caching optimization optimizePrefixCache(conversations: ConversationTurn[][]): void { const prefixes = new Map<string, number>();
conversations.forEach(conv => { const prefix = this._formatPrefix(conv.slice(0, -1)); // All but last turn prefixes.set(prefix, (prefixes.get(prefix) || 0) + 1); });
// Cache prefixes that appear multiple times prefixes.forEach((count, prefix) => { if (count > 1) { console.log(`Caching prefix used ${count} times (${prefix.length} chars)`); // In production, this would be passed to vLLM with prefix caching enabled } }); }
private _formatPrefix(turns: ConversationTurn[]): string { return turns.map(turn => `<|${turn.role}|>\n${turn.content}<|end|>` ).join('\n'); }
getCacheStats(): { size: number; hitRate: number } { return { size: this.cache.size, hitRate: 0 // Would track actual hit rate in production }; }}
// Example usageasync function main() { const engine = new vLLMAgentEngine( 'https://api.example.com/vllm', 'your-api-key' );
// Single generation const result = await engine.generate( '<|user|>\nExplain PagedAttention<|end|><|assistant|>\n', { temperature: 0.7, maxTokens: 256 } ); console.log('Generated:', result.text); console.log(`Tokens: ${result.tokens}, Latency: ${result.latency}ms`);
// Batch processing const prompts = [ '<|user|>\nWhat is context caching?<|end|><|assistant|>\n', '<|user|>\nHow does vLLM improve throughput?<|end|><|assistant|>\n' ];
const batchResults = await engine.batchGenerate(prompts, { temperature: 0.7, maxTokens: 256 });
// Optimize prefix caching const conversations = [ [ { role: 'user', content: 'I need help with Python' }, { role: 'assistant', content: 'I can help with Python. What do you need?' }, { role: 'user', content: 'How do I read a CSV file?' } ], [ { role: 'user', content: 'I need help with Python' }, { role: 'assistant', content: 'I can help with Python. What do you need?' }, { role: 'user', content: 'How do I install packages?' } ] ];
engine.optimizePrefixCache(conversations); console.log('Cache stats:', engine.getCacheStats());}
main().catch(console.error);Performance Benefits:
- 2-4x throughput improvement over traditional serving systems
- Near-zero memory waste through fixed-size block allocation
- Continuous batching handles multiple requests efficiently
- Prefix caching eliminates redundant computation for common instructions
3. State Debugging & Monitoring
Section titled β3. State Debugging & MonitoringβProduction agents require real-time state tracking to prevent context overflow and detect memory pressure issues.
interface AgentState { sessionId: string; userId: string; contextWindow: number; memoryUsage: MemoryUsage; conversationHistory: Message[]; lastMemoryUpdate: Date;}
interface MemoryUsage { tokensUsed: number; tokensAvailable: number; utilizationPercent: number;}
interface Message { role: 'user' | 'assistant' | 'system'; content: string; timestamp: Date; tokenCount: number;}
class AgentStateDebugger { private state: AgentState; private maxContextWindow: number; private memoryThreshold: number;
constructor(userId: string, maxContextWindow: number = 128000) { this.maxContextWindow = maxContextWindow; this.memoryThreshold = 0.85; // Alert at 85% usage
this.state = { sessionId: this.generateSessionId(), userId, contextWindow: maxContextWindow, memoryUsage: { tokensUsed: 0, tokensAvailable: maxContextWindow, utilizationPercent: 0 }, conversationHistory: [], lastMemoryUpdate: new Date() }; }
private generateSessionId(): string { return `sess_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; }
// Estimate token count (approximate for debugging) private estimateTokenCount(text: string): number { // Rough estimate: 1 token β 4 characters return Math.ceil(text.length / 4); }
addMessage(role: 'user' | 'assistant' | 'system', content: string): void { const tokenCount = this.estimateTokenCount(content);
const message: Message = { role, content, timestamp: new Date(), tokenCount };
this.state.conversationHistory.push(message); this.state.memoryUsage.tokensUsed += tokenCount; this.state.memoryUsage.tokensAvailable = this.maxContextWindow - this.state.memoryUsage.tokensUsed; this.state.memoryUsage.utilizationPercent = (this.state.memoryUsage.tokensUsed / this.maxContextWindow) * 100; this.state.lastMemoryUpdate = new Date();
// Check for memory pressure if (this.state.memoryUsage.utilizationPercent >= this.memoryThreshold * 100) { this.handleMemoryPressure(); } }
private handleMemoryPressure(): void { const alert = { level: 'WARNING', message: 'Context window approaching capacity', sessionId: this.state.sessionId, utilization: this.state.memoryUsage.utilizationPercent.toFixed(2) + '%', tokensUsed: this.state.memoryUsage.tokensUsed, recommendedAction: 'Consider summarizing history or using memory bank' };
console.warn(JSON.stringify(alert, null, 2)); this.logState(); }
logState(): void { console.log(`\n=== Agent State Debug ===`); console.log(`Session: ${this.state.sessionId}`); console.log(`User: ${this.state.userId}`); console.log(`Messages: ${this.state.conversationHistory.length}`); console.log(`Memory: ${this.state.memoryUsage.tokensUsed}/${this.maxContextWindow} tokens`); console.log(`Utilization: ${this.state.memoryUsage.utilizationPercent.toFixed(2)}%`); console.log(`Last Update: ${this.state.lastMemoryUpdate.toISOString()}`); console.log(`========================\n`); }
getSummary(): AgentState { return { ...this.state }; }
// Simulate memory optimization strategy optimizeContext(): void { if (this.state.memoryUsage.utilizationPercent > 70) { console.log('Triggering context optimization...');
// Strategy 1: Summarize older messages if (this.state.conversationHistory.length > 10) { const recentMessages = this.state.conversationHistory.slice(-5); const olderMessages = this.state.conversationHistory.slice(0, -5);
const summaryTokenCount = olderMessages.reduce((sum, msg) => sum + msg.tokenCount, 0);
console.log(`Summarized ${olderMessages.length} older messages (${summaryTokenCount} tokens)`); console.log(`Keeping ${recentMessages.length} recent messages`);
// In production, you would send these to a memory bank this.saveToMemoryBank(olderMessages); } } }
private saveToMemoryBank(messages: Message[]): void { // Placeholder for memory bank integration const memoryEntry = { type: 'conversation_summary', sessionId: this.state.sessionId, timestamp: new Date(), messageCount: messages.length, totalTokens: messages.reduce((sum, msg) => sum + msg.tokenCount, 0) };
console.log('Saving to memory bank:', JSON.stringify(memoryEntry, null, 2)); }
resetSession(): void { this.state.conversationHistory = []; this.state.memoryUsage.tokensUsed = 0; this.state.memoryUsage.tokensAvailable = this.maxContextWindow; this.state.memoryUsage.utilizationPercent = 0; this.state.lastMemoryUpdate = new Date(); console.log(`Session ${this.state.sessionId} reset`); }}
// Example usageconst debugger = new AgentStateDebugger('user-12345', 200000);
// Simulate conversationconst sampleConversation = [ { role: 'user', content: 'I need help with Python data analysis' }, { role: 'assistant', content: 'I can help you with pandas, numpy, and matplotlib. What specific task are you working on?' }, { role: 'user', content: 'I have a CSV file with 10 million rows and need to clean it efficiently' }, { role: 'assistant', content: 'For 10 million rows, use pandas with chunking or Dask for parallel processing. Let me show you the optimal approach...' }, { role: 'user', content: 'Can you also explain memory optimization techniques?' }];
sampleConversation.forEach(msg => { debugger.addMessage(msg.role, msg.content);});
// Show current statedebugger.logState();
// Check if optimization is neededdebugger.optimizeContext();
export { AgentStateDebugger, AgentState, Message };from dataclasses import dataclass, fieldfrom typing import List, Dict, Literalfrom datetime import datetimeimport json
@dataclassclass Message: role: Literal['user', 'assistant', 'system'] content: str timestamp: datetime token_count: int
@dataclassclass MemoryUsage: tokens_used: int tokens_available: int utilization_percent: float
@dataclassclass AgentState: session_id: str user_id: str context_window: int memory_usage: MemoryUsage conversation_history: List[Message] last_memory_update: datetime
class AgentStateDebugger: """Production-ready agent state monitoring and debugging."""
def __init__(self, user_id: str, max_context_window: int = 128000): self.max_context_window = max_context_window self.memory_threshold = 0.85
self.state = AgentState( session_id=self._generate_session_id(), user_id=user_id, context_window=max_context_window, memory_usage=MemoryUsage(0, max_context_window, 0.0), conversation_history=[], last_memory_update=datetime.now() )
def _generate_session_id(self) -> str: import uuid return f"sess_{uuid.uuid4().hex[:8]}"
def _estimate_token_count(self, text: str) -> int: """Approximate token count (1 token β 4 characters).""" return max(1, len(text) // 4)
def add_message(self, role: Literal['user', 'assistant', 'system'], content: str) -> None: """Add message and update memory usage.""" token_count = self._estimate_token_count(content)
message = Message( role=role, content=content, timestamp=datetime.now(), token_count=token_count )
self.state.conversation_history.append(message) self.state.memory_usage.tokens_used += token_count self.state.memory_usage.tokens_available = ( self.max_context_window - self.state.memory_usage.tokens_used ) self.state.memory_usage.utilization_percent = ( self.state.memory_usage.tokens_used / self.max_context_window * 100 ) self.state.last_memory_update = datetime.now()
# Check memory pressure if self.state.memory_usage.utilization_percent >= self.memory_threshold * 100: self._handle_memory_pressure()
def _handle_memory_pressure(self) -> None: """Alert when context window is approaching capacity.""" alert = { "level": "WARNING", "message": "Context window approaching capacity", "session_id": self.state.session_id, "utilization": f"{self.state.memory_usage.utilization_percent:.2f}%", "tokens_used": self.state.memory_usage.tokens_used, "recommended_action": "Consider summarizing history or using memory bank" }
print(json.dumps(alert, indent=2, default=str)) self.log_state()
def log_state(self) -> None: """Log current agent state for debugging.""" print("\n=== Agent State Debug ===") print(f"Session: {self.state.session_id}") print(f"User: {self.state.user_id}") print(f"Messages: {len(self.state.conversation_history)}") print(f"Memory: {self.state.memory_usage.tokens_used}/{self.max_context_window} tokens") print(f"Utilization: {self.state.memory_usage.utilization_percent:.2f}%") print(f"Last Update: {self.state.last_memory_update.isoformat()}") print("========================\n")
def get_summary(self) -> Dict: """Return serializable state summary.""" return { "session_id": self.state.session_id, "user_id": self.state.user_id, "context_window": self.state.context_window, "memory_usage": { "tokens_used": self.state.memory_usage.tokens_used, "tokens_available": self.state.memory_usage.tokens_available, "utilization_percent": self.state.memory_usage.utilization_percent }, "message_count": len(self.state.conversation_history), "last_update": self.state.last_memory_update.isoformat() }
def optimize_context(self) -> None: """Apply optimization strategies when memory pressure detected.""" if self.state.memory_usage.utilization_percent > 70: print("Triggering context optimization...")
if len(self.state.conversation_history) > 10: recent_messages = self.state.conversation_history[-5:] older_messages = self.state.conversation_history[:-5]
summary_token_count = sum(msg.token_count for msg in older_messages)
print(f"Summarized {len(older_messages)} older messages ({summary_token_count} tokens)") print(f"Keeping {len(recent_messages)} recent messages")
self._save_to_memory_bank(older_messages)
def _save_to_memory_bank(self, messages: List[Message]) -> None: """Simulate saving to memory bank.""" memory_entry = { "type": "conversation_summary", "session_id": self.state.session_id, "timestamp": datetime.now().isoformat(), "message_count": len(messages), "total_tokens": sum(msg.token_count for msg in messages) }
print("Saving to memory bank:", json.dumps(memory_entry, indent=2))
def reset_session(self) -> None: """Reset session for new conversation.""" self.state.conversation_history.clear() self.state.memory_usage.tokens_used = 0 self.state.memory_usage.tokens_available = self.max_context_window self.state.memory_usage.utilization_percent = 0.0 self.state.last_memory_update = datetime.now() print(f"Session {self.state.session_id} reset")
# Example usageif __name__ == "__main__": debugger = AgentStateDebugger("user-12345", 200000)
sample_conversation = [ ("user", "I need help with Python data analysis"), ("assistant", "I can help you with pandas, numpy, and matplotlib. What specific task are you working on?"), ("user", "I have a CSV file with 10 million rows and need to clean it efficiently"), ("assistant", "For 10 million rows, use pandas with chunking or Dask for parallel processing..."), ("user", "Can you also explain memory optimization techniques?")