Source code for tooluniverse.mcp_integration

"""
ToolUniverse MCP Integration Extensions

This module extends ToolUniverse with methods to automatically discover and load
MCP tools from remote servers, providing seamless integration between local tools
and remote MCP services.
"""

from typing import List, Dict, Any


[docs] def load_mcp_tools(self, server_urls: List[str] = None, **kwargs): """ Load MCP tools from remote servers into this ToolUniverse instance. This method automatically discovers tools from MCP servers and registers them as ToolUniverse tools, enabling seamless usage of remote capabilities. Parameters: =========== server_urls : list of str, optional List of MCP server URLs to load tools from. Examples: - ["http://localhost:8001", "http://analysis-server:8002"] - ["ws://localhost:9000"] # WebSocket MCP servers If None, attempts to discover from local MCP tool registry. **kwargs Additional configuration options: - tool_prefix (str): Prefix for loaded tool names (default: "mcp_") - timeout (int): Connection timeout in seconds (default: 30) - auto_register (bool): Whether to auto-register discovered tools (default: True) - selected_tools (list): Specific tools to load from each server - categories (list): Tool categories to filter by Returns: ======== dict Summary of loaded tools with counts and any errors encountered. Examples: ========= Load from specific servers: ```python tu = ToolUniverse() # Load tools from multiple MCP servers result = tu.load_mcp_tools([ "http://localhost:8001", # Local analysis server "http://ml-server:8002", # Remote ML server "ws://realtime:9000" # WebSocket server ]) print(f"Loaded {result['total_tools']} tools from {result['servers_connected']} servers") ``` Load with custom configuration: ```python tu.load_mcp_tools( server_urls=["http://localhost:8001"], tool_prefix="analysis\\_", timeout=60, selected_tools=["protein_analysis", "drug_interaction"] ) ``` Auto-discovery from local registry: ```python # If you have registered MCP tools locally, auto-discover their servers tu.load_mcp_tools() # Uses servers from mcp_tool_registry ``` """ # Default configuration config = { "tool_prefix": "mcp_", "timeout": 30, "auto_register": True, "selected_tools": None, "categories": None, **kwargs, } # Get server URLs if server_urls is None: try: from .mcp_tool_registry import get_mcp_tool_urls server_urls = get_mcp_tool_urls() except ImportError: server_urls = [] if not server_urls: print("📭 No MCP servers specified or discovered") return {"total_tools": 0, "servers_connected": 0, "errors": []} print(f"🔄 Loading MCP tools from {len(server_urls)} servers...") results = {"total_tools": 0, "servers_connected": 0, "servers": {}, "errors": []} for url in server_urls: try: result = self._load_tools_from_mcp_server(url, config) results["servers"][url] = result results["total_tools"] += result.get("tools_loaded", 0) if result.get("success", False): results["servers_connected"] += 1 except Exception as e: error_msg = f"Failed to load from {url}: {str(e)}" print(f"❌ {error_msg}") results["errors"].append(error_msg) print( f"✅ Loaded {results['total_tools']} tools from {results['servers_connected']} servers" ) return results
def _load_tools_from_mcp_server( self, server_url: str, config: Dict[str, Any] ) -> Dict[str, Any]: """Load tools from a specific MCP server.""" try: # Create MCPAutoLoaderTool configuration loader_name = f"mcp_loader_{server_url.replace('://', '_').replace(':', '_').replace('/', '_')}" loader_config = { "name": loader_name, "type": "MCPAutoLoaderTool", "server_url": server_url, "auto_register": config["auto_register"], "tool_prefix": config["tool_prefix"], "timeout": config["timeout"], } # Add selected tools filter if specified if config["selected_tools"]: loader_config["selected_tools"] = config["selected_tools"] # Add the auto-loader configuration directly to the tools list self.all_tools.append(loader_config) self.all_tool_dict[loader_name] = loader_config print(f"✅ Created MCP auto-loader for {server_url}") # Try to discover tools immediately to get count (mimic _process_mcp_auto_loaders) try: # Import the tool registry to get the class from .execute_function import tool_type_mappings # Create auto loader instance (same as _process_mcp_auto_loaders) auto_loader = tool_type_mappings["MCPAutoLoaderTool"](loader_config) # Run auto-load process (same as _process_mcp_auto_loaders) import asyncio import warnings async def _run_auto_load(): """Run auto-load with proper cleanup""" try: result = await auto_loader.auto_load_and_register(self) return result finally: # Ensure session cleanup await auto_loader._close_session() # Check if we're already in an event loop try: asyncio.get_running_loop() in_event_loop = True except RuntimeError: in_event_loop = False if in_event_loop: # In event loop - create task for later processing tools_count = 0 discovery_error = ( "Cannot process in event loop - will be processed later" ) else: # No event loop, safe to create one (same as _process_mcp_auto_loaders) with warnings.catch_warnings(): warnings.simplefilter("ignore", ResourceWarning) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: result = loop.run_until_complete(_run_auto_load()) tools_count = result.get("registered_count", 0) discovery_error = None finally: loop.close() asyncio.set_event_loop(None) return { "success": True, "tools_loaded": tools_count, "loader_name": loader_name, "server_url": server_url, "discovery_error": discovery_error, } except Exception as e: # Auto-loader created but discovery failed return { "success": True, "tools_loaded": 0, "loader_name": loader_name, "server_url": server_url, "discovery_error": str(e), } except Exception as e: return {"success": False, "error": str(e), "server_url": server_url}
[docs] def discover_mcp_tools(self, server_urls: List[str] = None, **kwargs) -> Dict[str, Any]: """ Discover available tools from MCP servers without loading them. This method connects to MCP servers to discover what tools are available without actually registering them in ToolUniverse. Useful for exploration and selective tool loading. Parameters: =========== server_urls : list of str, optional List of MCP server URLs to discover from **kwargs Additional options: - timeout (int): Connection timeout (default: 30) - include_schemas (bool): Include tool parameter schemas (default: True) Returns: ======== dict Discovery results with tools organized by server Examples: ========= ```python tu = ToolUniverse() # Discover what's available discovery = tu.discover_mcp_tools([ "http://localhost:8001", "http://ml-server:8002" ]) # Show available tools for server, info in discovery["servers"].items(): print(f"\\n{server}:") for tool in info.get("tools", []): print(f" - {tool['name']}: {tool['description']}") ``` """ if server_urls is None: try: from .mcp_tool_registry import get_mcp_tool_urls server_urls = get_mcp_tool_urls() except ImportError: server_urls = [] if not server_urls: return {"servers": {}, "total_tools": 0, "errors": []} config = {"timeout": 30, "include_schemas": True, **kwargs} print(f"🔍 Discovering tools from {len(server_urls)} MCP servers...") results = {"servers": {}, "total_tools": 0, "errors": []} for url in server_urls: try: # Create temporary discovery client discovery_config = { "name": f"temp_discovery_{hash(url)}", "type": "MCPClientTool", "server_url": url, "timeout": config["timeout"], } # Register temporary tool for discovery self.register_custom_tool( tool_class=None, tool_type="MCPClientTool", config=discovery_config ) # Discover tools discovery_result = self.run_tool( discovery_config["name"], {"operation": "list_tools"} ) if discovery_result.get("success", False): tools = discovery_result.get("tools", []) results["servers"][url] = { "tools": tools, "count": len(tools), "status": "success", } results["total_tools"] += len(tools) print(f"✅ {url}: {len(tools)} tools discovered") else: error_msg = discovery_result.get("error", "Unknown error") results["servers"][url] = { "tools": [], "count": 0, "status": "error", "error": error_msg, } print(f"❌ {url}: {error_msg}") # Clean up temporary tool if hasattr(self, "_remove_tool"): self._remove_tool(discovery_config["name"]) except Exception as e: error_msg = f"Discovery failed for {url}: {str(e)}" results["errors"].append(error_msg) results["servers"][url] = { "tools": [], "count": 0, "status": "error", "error": str(e), } print(f"❌ {error_msg}") print(f"🔍 Discovery complete: {results['total_tools']} total tools found") return results
[docs] def list_mcp_connections(self) -> Dict[str, Any]: """ List all active MCP connections and loaded tools. Returns: ======== dict Information about MCP connections, auto-loaders, and loaded tools Examples: ========= ```python tu = ToolUniverse() tu.load_mcp_tools(["http://localhost:8001"]) connections = tu.list_mcp_connections() print(f"Active MCP connections: {len(connections['connections'])}") ``` """ mcp_tools = {} mcp_loaders = {} # Find MCP-related tools in the current tool configuration for tool_name, tool_config in getattr(self, "tool_configs", {}).items(): tool_type = tool_config.get("type", "") if tool_type == "MCPAutoLoaderTool": server_url = tool_config.get("server_url", "") mcp_loaders[tool_name] = { "server_url": server_url, "tool_prefix": tool_config.get("tool_prefix", "mcp_"), "auto_register": tool_config.get("auto_register", True), "config": tool_config, } elif tool_type in ["MCPClientTool", "MCPProxyTool"]: server_url = tool_config.get("server_url", "") mcp_tools[tool_name] = { "type": tool_type, "server_url": server_url, "config": tool_config, } return { "connections": {"auto_loaders": mcp_loaders, "direct_tools": mcp_tools}, "total_mcp_tools": len(mcp_tools) + len(mcp_loaders), "servers": list( set( [config["server_url"] for config in mcp_loaders.values()] + [config["server_url"] for config in mcp_tools.values()] ) ), }
# Monkey patch methods into ToolUniverse def _patch_tooluniverse(): """Add MCP integration methods to ToolUniverse class.""" try: from .execute_function import ToolUniverse # Add methods to ToolUniverse if they don't exist if not hasattr(ToolUniverse, "load_mcp_tools"): ToolUniverse.load_mcp_tools = load_mcp_tools if not hasattr(ToolUniverse, "discover_mcp_tools"): ToolUniverse.discover_mcp_tools = discover_mcp_tools if not hasattr(ToolUniverse, "list_mcp_connections"): ToolUniverse.list_mcp_connections = list_mcp_connections if not hasattr(ToolUniverse, "_load_tools_from_mcp_server"): ToolUniverse._load_tools_from_mcp_server = _load_tools_from_mcp_server # print("✅ ToolUniverse MCP integration methods added") except ImportError as e: print(f"⚠️ Could not patch ToolUniverse: {e}") # Auto-patch when module is imported _patch_tooluniverse()