Source code for tooluniverse.memory_manager

#!/usr/bin/env python3
"""
Memory Manager for Multi-Agent Systems
Manages session-based memory for multi-agent workflows
"""

import json
import uuid
from datetime import datetime
from typing import Dict, Any, Optional, List
import threading


[docs] class MemoryManager: """Manages session-based memory for multi-agent systems."""
[docs] def __init__(self, max_sessions=1000, session_timeout=3600): self.sessions = {} self.max_sessions = max_sessions self.session_timeout = session_timeout # seconds self.lock = threading.Lock()
[docs] def create_session(self, user_id: str = None, session_name: str = None) -> str: """Create a new session and return its ID.""" with self.lock: session_id = f"{user_id or 'anonymous'}_{uuid.uuid4().hex[:8]}_{int(datetime.now().timestamp())}" self.sessions[session_id] = { "session_id": session_id, "user_id": user_id or "anonymous", "session_name": session_name or f"Session_{datetime.now().strftime('%Y%m%d_%H%M%S')}", "created_at": datetime.now(), "last_accessed": datetime.now(), "context": { "user_query": "", "results": {}, "history": [], "current_phase": "initializing", }, "status": "active", } self._cleanup_expired_sessions() return session_id
[docs] def get_session(self, session_id: str) -> Optional[Dict[str, Any]]: """Retrieve session data by ID, updating last-accessed timestamp.""" with self.lock: if session_id in self.sessions: self.sessions[session_id]["last_accessed"] = datetime.now() return self.sessions[session_id] return None
[docs] def update_session_context(self, session_id: str, updates: Dict[str, Any]) -> bool: """Update the context dictionary for an existing session.""" with self.lock: if session_id in self.sessions: session = self.sessions[session_id] session["context"].update(updates) session["last_accessed"] = datetime.now() return True return False
[docs] def add_agent_result( self, session_id: str, agent_name: str, result: Any, phase: str = None ) -> bool: """Record an agent execution result in the session history.""" with self.lock: if session_id in self.sessions: session = self.sessions[session_id] # Append to execution history history_entry = { "agent": agent_name, "result": result, "timestamp": datetime.now().isoformat(), "phase": phase or session["context"]["current_phase"], } session["context"]["history"].append(history_entry) # Update latest result for this agent session["context"]["results"][agent_name] = result session["last_accessed"] = datetime.now() return True return False
[docs] def get_context_for_agent(self, session_id: str, agent_name: str) -> str: """Build a JSON context string suitable for passing to an agent.""" session = self.get_session(session_id) if not session: return "{}" context = session["context"] # Build agent-facing context with recent history agent_context = { "session_id": session_id, "user_id": session["user_id"], "session_name": session["session_name"], "user_query": context["user_query"], "current_phase": context["current_phase"], "previous_results": context["results"], "execution_history": context["history"][-5:], "timestamp": datetime.now().isoformat(), } return json.dumps(agent_context, indent=2)
[docs] def set_current_phase(self, session_id: str, phase: str) -> bool: """Set the current execution phase for a session.""" return self.update_session_context(session_id, {"current_phase": phase})
[docs] def get_session_summary(self, session_id: str) -> Dict[str, Any]: """Return a summary dictionary for the given session.""" session = self.get_session(session_id) if not session: return {} return { "session_id": session_id, "user_id": session["user_id"], "session_name": session["session_name"], "created_at": session["created_at"].isoformat(), "last_accessed": session["last_accessed"].isoformat(), "current_phase": session["context"]["current_phase"], "agents_executed": list(session["context"]["results"].keys()), "history_count": len(session["context"]["history"]), "status": session["status"], }
[docs] def list_user_sessions(self, user_id: str) -> List[Dict[str, Any]]: """List all sessions for a given user, sorted by last access time.""" with self.lock: user_sessions = [] for session_id, session in self.sessions.items(): if session["user_id"] == user_id: user_sessions.append(self.get_session_summary(session_id)) return sorted(user_sessions, key=lambda x: x["last_accessed"], reverse=True)
def _cleanup_expired_sessions(self): """Remove sessions that have exceeded the timeout.""" current_time = datetime.now() expired_sessions = [] for session_id, session in self.sessions.items(): if (current_time - session["last_accessed"]).seconds > self.session_timeout: expired_sessions.append(session_id) for session_id in expired_sessions: del self.sessions[session_id]
[docs] def close_session(self, session_id: str) -> bool: """Mark a session as closed.""" with self.lock: if session_id in self.sessions: self.sessions[session_id]["status"] = "closed" return True return False
# Global memory manager instance memory_manager = MemoryManager()