Source code for tooluniverse.smcp

"""
Scientific Model Context Protocol (SMCP) - Enhanced MCP Server with ToolUniverse Integration

SMCP is a sophisticated MCP (Model Context Protocol) server that bridges the gap between
AI agents and scientific tools. It seamlessly integrates ToolUniverse's extensive
collection of 350+ scientific tools with the MCP protocol, enabling AI systems to
access scientific databases, perform complex analyses, and execute scientific workflows.

The SMCP module provides a complete solution for exposing scientific computational
resources through the standardized MCP protocol, making it easy for AI agents to
discover, understand, and execute scientific tools in a unified manner.

Usage Patterns
--------------

Quick Start:

```python
# High-performance server with custom configuration
server = SMCP(
    name="Production Scientific API",
    tool_categories=["uniprot", "ChEMBL", "opentarget", "hpa"],
    max_workers=20,
    search_enabled=True
)
server.run_simple(
    transport="http",
    host="0.0.0.0",
    port=7000
)
```

Client Integration:
```python
# Using MCP client to discover and use tools
import json

# Discover protein analysis tools
response = await client.call_tool("find_tools", {
    "query": "protein structure analysis",
    "limit": 5
})

# Use discovered tool
result = await client.call_tool("UniProt_get_entry_by_accession", {
    "arguments": json.dumps({"accession": "P05067"})
})
```

Architecture
------------

┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│   MCP Client    │◄──►│      SMCP        │◄──►│  ToolUniverse   │
│  (AI Agent)     │    │     Server       │    │   (350+ Tools)  │
└─────────────────┘    └──────────────────┘    └─────────────────┘


                       ┌──────────────────┐
                       │  Scientific      │
                       │  Databases &     │
                       │  Services        │
                       └──────────────────┘

The SMCP server acts as an intelligent middleware layer that:
1. Receives MCP requests from AI agents/clients
2. Translates requests to ToolUniverse tool calls
3. Executes tools against scientific databases/services
4. Returns formatted results via MCP protocol
5. Provides intelligent tool discovery and recommendation

Integration Points
------------------

MCP Protocol Layer:
    - Standard MCP methods (tools/list, tools/call, etc.)
    - Custom scientific methods (tools/find, tools/search)
    - Transport-agnostic communication (stdio, HTTP, SSE)
    - Proper error codes and JSON-RPC 2.0 compliance

ToolUniverse Integration:
    - Dynamic tool loading and configuration
    - Schema transformation and validation
    - Execution wrapper with error handling
    - Category-based tool organization

AI Agent Interface:
    - Natural language tool discovery
    - Contextual tool recommendations
    - Structured parameter schemas
    - Comprehensive tool documentation
"""

import asyncio
import functools
import json
import sys
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, List, Optional, Union, Callable, Literal

from fastmcp import FastMCP

FASTMCP_AVAILABLE = True

from .execute_function import ToolUniverse
from .logging_config import (
    get_logger,
)


[docs] class SMCP(FastMCP): """ Scientific Model Context Protocol (SMCP) Server SMCP is an enhanced MCP (Model Context Protocol) server that seamlessly integrates ToolUniverse's extensive collection of scientific and scientific tools with the FastMCP framework. It provides a unified, AI-accessible interface for scientific computing, data analysis, and research workflows. The SMCP server extends standard MCP capabilities with scientific domain expertise, intelligent tool discovery, and optimized configurations for research applications. It automatically handles the complex task of exposing hundreds of specialized tools through a consistent, well-documented interface. Key Features: ============ 🔬 **Scientific Tool Integration**: Native access to 350+ specialized tools covering scientific databases, literature search, clinical data, genomics, proteomics, chemical informatics, and AI-powered analysis capabilities. 🧠 **AI-Powered Tool Discovery**: Multi-tiered intelligent search system using: - ToolFinderLLM: Cost-optimized LLM-based semantic understanding with pre-filtering - Tool_RAG: Embedding-based similarity search - Keyword Search: Simple text matching as reliable fallback 📡 **Full MCP Protocol Support**: Complete implementation of MCP specification with: - Standard methods (tools/list, tools/call, resources/*, prompts/*) - Custom scientific methods (tools/find, tools/search) - Multi-transport support (stdio, HTTP, SSE) - JSON-RPC 2.0 compliance with proper error handling ⚡ **High-Performance Architecture**: Production-ready features including: - Configurable thread pools for concurrent tool execution - Intelligent tool loading and caching - Resource management and graceful degradation - Comprehensive error handling and recovery 🔧 **Developer-Friendly**: Simplified configuration and deployment with: - Sensible defaults for scientific computing - Flexible customization options - Comprehensive documentation and examples - Built-in diagnostic and monitoring tools Custom MCP Methods: ================== tools/find: AI-powered tool discovery using natural language queries. Supports semantic search, category filtering, and flexible response formats. tools/search: Alternative endpoint for tool discovery with identical functionality to tools/find, provided for compatibility and convenience. Parameters: =========== name : str, optional Human-readable server name used in logs and identification. Default: "SMCP Server" Examples: "Scientific Research API", "Drug Discovery Server" tooluniverse_config : ToolUniverse or dict, optional Either a pre-configured ToolUniverse instance or configuration dict. If None, creates a new ToolUniverse with default settings. Allows reuse of existing tool configurations and customizations. tool_categories : list of str, optional Specific ToolUniverse categories to load. If None and auto_expose_tools=True, loads all available tools. Common combinations: - Scientific: ["ChEMBL", "uniprot", "opentarget", "pubchem", "hpa"] - Literature: ["EuropePMC", "semantic_scholar", "pubtator", "agents"] - Clinical: ["fda_drug_label", "clinical_trials", "adverse_events"] exclude_tools : list of str, optional Specific tool names to exclude from loading. These tools will not be exposed via the MCP interface even if they are in the loaded categories. Useful for removing specific problematic or unwanted tools. exclude_categories : list of str, optional Tool categories to exclude from loading. These entire categories will be skipped during tool loading. Can be combined with tool_categories to first select categories and then exclude specific ones. include_tools : list of str, optional Specific tool names to include. If provided, only these tools will be loaded regardless of categories. Overrides category-based selection. tools_file : str, optional Path to a text file containing tool names to include (one per line). Alternative to include_tools parameter. Comments (lines starting with #) and empty lines are ignored. tool_config_files : dict of str, optional Additional tool configuration files to load. Format: {"category_name": "/path/to/config.json"}. These files will be loaded in addition to the default tool files. include_tool_types : list of str, optional Specific tool types to include. If provided, only tools of these types will be loaded. Available types include: 'OpenTarget', 'ToolFinderEmbedding', 'ToolFinderKeyword', 'ToolFinderLLM', etc. exclude_tool_types : list of str, optional Tool types to exclude from loading. These tool types will be skipped during tool loading. Useful for excluding entire categories of tools (e.g., all ToolFinder types or all OpenTarget tools). space : str or list of str, optional Space configuration URI(s) to load. Can be a single URI string or a list of URIs for loading multiple Space configurations. Supported formats: - Local file: "./config.yaml" or "/path/to/config.yaml" - HuggingFace: "hf:username/repo" or "hf:username/repo/file.yaml" - HTTP URL: "https://example.com/config.yaml" When provided, Space configurations are loaded after tool initialization, applying LLM settings, hooks, and tool selections from the configuration files. Multiple spaces can be loaded sequentially, with later configurations potentially overriding earlier ones. Example: space="./my-workspace.yaml" Example: space=["hf:community/bio-tools", "./custom-tools.yaml"] auto_expose_tools : bool, default True Whether to automatically expose ToolUniverse tools as MCP tools. When True, all loaded tools become available via the MCP interface with automatic schema conversion and execution wrapping. search_enabled : bool, default True Enable AI-powered tool search functionality via tools/find method. Includes ToolFinderLLM (cost-optimized LLM-based), Tool_RAG (embedding-based), and simple keyword search capabilities with intelligent fallback. max_workers : int, default 5 Maximum number of concurrent worker threads for tool execution. Higher values allow more parallel tool calls but use more resources. Recommended: 5-20 depending on server capacity and expected load. hooks_enabled : bool, default False Whether to enable output processing hooks for intelligent post-processing of tool outputs. When True, hooks can automatically summarize long outputs, save results to files, or apply other transformations. hook_config : dict, optional Custom hook configuration dictionary. If provided, overrides default hook settings. Should contain 'hooks' list with hook definitions. Example: {"hooks": [{"name": "summarization_hook", "type": "SummarizationHook", ...}]} hook_type : str, optional Simple hook type selection. Can be 'SummarizationHook', 'FileSaveHook', or a list of both. Provides an easy way to enable hooks without full configuration. Takes precedence over hooks_enabled when specified. compact_mode : bool, default False Enable compact mode that only exposes core tools to prevent context window overflow. When True: - Only exposes search tools (find_tools), execute tool (execute_tool), and tool discovery tools (list_tools, grep_tools, get_tool_info) - All tools are still loaded in background for execute_tool to work - Prevents automatic exposure of all tools, reducing context window usage - Maintains full functionality through search and execute capabilities - Tool discovery tools enable progressive disclosure: start with minimal info, request details when needed - Agent-friendly features: simple text search (no regex required), natural language task discovery, combined search+detail tools to reduce tool call overhead **kwargs** Additional arguments passed to the underlying FastMCP server instance. Supports all FastMCP configuration options for advanced customization. Raises: ======= ImportError If FastMCP is not installed. FastMCP is a required dependency for SMCP. Install with: pip install fastmcp Notes: ====== - SMCP automatically handles ToolUniverse tool loading and MCP conversion - Tool search uses ToolFinderLLM (optimized for cost) when available, gracefully falls back to simpler methods - All tools support JSON argument passing for maximum flexibility - Server supports graceful shutdown and comprehensive resource cleanup - Thread pool execution ensures non-blocking operation for concurrent requests - Built-in error handling provides informative debugging information """
[docs] def __init__( self, name: Optional[str] = None, tooluniverse_config: Optional[Union[ToolUniverse, Dict[str, Any]]] = None, tool_categories: Optional[List[str]] = None, exclude_tools: Optional[List[str]] = None, exclude_categories: Optional[List[str]] = None, include_tools: Optional[List[str]] = None, tools_file: Optional[str] = None, tool_config_files: Optional[Dict[str, str]] = None, include_tool_types: Optional[List[str]] = None, exclude_tool_types: Optional[List[str]] = None, space: Optional[Union[str, List[str]]] = None, auto_expose_tools: bool = True, search_enabled: bool = True, max_workers: int = 5, hooks_enabled: bool = False, hook_config: Optional[Dict[str, Any]] = None, hook_type: Optional[str] = None, compact_mode: bool = False, **kwargs, ): if not FASTMCP_AVAILABLE: raise ImportError( "FastMCP is required for SMCP. Install it with: pip install fastmcp" ) # Filter out SMCP-specific kwargs before passing to FastMCP fastmcp_kwargs = kwargs.copy() fastmcp_kwargs.pop("tooluniverse", None) # Remove if accidentally passed # Initialize FastMCP with default settings optimized for scientific use super().__init__(name=name or "SMCP Server", **fastmcp_kwargs) # Get logger for this class self.logger = get_logger("SMCP") # Initialize ToolUniverse with hook support if isinstance(tooluniverse_config, ToolUniverse): self.tooluniverse = tooluniverse_config else: self.tooluniverse = ToolUniverse( tool_files=tooluniverse_config, keep_default_tools=True, hooks_enabled=hooks_enabled, hook_config=hook_config, hook_type=hook_type, enable_name_shortening=True, ) # Configuration self.tool_categories = tool_categories self.exclude_tools = exclude_tools or [] self.exclude_categories = exclude_categories or [] self.include_tools = include_tools or [] self.tools_file = tools_file self.tool_config_files = tool_config_files or {} self.include_tool_types = include_tool_types or [] self.exclude_tool_types = exclude_tool_types or [] self.space = space self.compact_mode = compact_mode # In compact mode, don't auto-expose all tools self.auto_expose_tools = False if compact_mode else auto_expose_tools self.search_enabled = search_enabled self.max_workers = max_workers self.hooks_enabled = hooks_enabled self.hook_config = hook_config self.hook_type = hook_type # Space configuration storage self.space_llm_config = None self.space_metadata = None # Thread pool for concurrent tool execution self.executor = ThreadPoolExecutor(max_workers=max_workers) # Track exposed tools to avoid duplicates self._exposed_tools = set() # Initialize TaskManager for MCP Tasks support from .task_manager import TaskManager self.task_manager = TaskManager(tool_universe=self.tooluniverse) self._task_manager_started = False # Load Space configurations first if provided if space: self._load_space_configs(space) # Initialize SMCP-specific features (after Space is loaded) self._setup_smcp_tools() # Register custom MCP methods self._register_custom_mcp_methods()
[docs] def _load_space_configs(self, space: Union[str, List[str]]): """ Load Space configurations. This method loads Space configuration(s) and retrieves the LLM config and metadata from ToolUniverse. It completely reuses ToolUniverse's load_space functionality without reimplementing any logic. Args: space: Space URI or list of URIs (e.g., "./config.yaml", "hf:user/repo", or ["config1.yaml", "config2.yaml"]) """ space_list = [space] if isinstance(space, str) else space for uri in space_list: self.logger.info("📦 Loading Space: %s", uri) # Pass filtering parameters from SMCP to load_space config = self.tooluniverse.load_space( uri, exclude_tools=self.exclude_tools, exclude_categories=self.exclude_categories, include_tools=self.include_tools, tools_file=self.tools_file, # KEY FIX: Pass tools_file filter include_tool_types=self.include_tool_types, exclude_tool_types=self.exclude_tool_types, ) # Get configurations from ToolUniverse (complete reuse) self.space_metadata = self.tooluniverse.get_space_metadata() self.space_llm_config = self.tooluniverse.get_space_llm_config() self.logger.info("✅ Space loaded: %s", config.get("name", "Unknown"))
[docs] def get_llm_config(self) -> Optional[Dict[str, Any]]: """ Get the current Space LLM configuration. Returns: LLM configuration dictionary or None if not set """ return self.space_llm_config
[docs] def _register_custom_mcp_methods(self): """ Register custom MCP protocol methods for enhanced functionality. This method extends the standard MCP protocol by registering custom handlers for scientific tool discovery and search operations, as well as MCP Tasks support for long-running operations. Custom Methods Registered: ========================= - tools/find: AI-powered tool discovery using natural language queries - tools/search: Alternative endpoint for tool search (alias for tools/find) - tasks/get: Get current task status - tasks/list: List all tasks - tasks/cancel: Cancel a running task - tasks/result: Wait for task completion and get result Implementation Details: ====================== - Uses FastMCP's middleware system instead of request handler patching - Implements custom middleware methods for tools/find and tools/search - Adds MCP Tasks protocol support for long-running tool operations - Standard MCP methods (tools/list, tools/call) are handled by FastMCP - Implements proper error handling and JSON-RPC 2.0 compliance Notes: ====== This method is called automatically during SMCP initialization and should not be called manually. """ try: # Temporarily disabled for Codex compatibility # Add custom middleware for tools/find and tools/search # self.add_middleware(self._tools_find_middleware) # Register MCP Tasks handlers # Note: FastMCP should handle these via its built-in MCP Tasks support # but we provide our own handlers for compatibility self.logger.info( "✅ Custom MCP methods registration skipped for Codex compatibility" ) self.logger.info("✅ MCP Tasks support initialized") except Exception as e: self.logger.error(f"Error registering custom MCP methods: {e}")
[docs] def _get_valid_categories(self): """Get valid tool categories from ToolUniverse.""" try: if hasattr(self.tooluniverse, "get_tool_types"): return set(self.tooluniverse.get_tool_types()) temp_tu = ToolUniverse() return set(temp_tu.get_tool_types()) except Exception as e: self.logger.error(f"Error getting valid categories: {e}") return set()
[docs] def _load_tools_with_filters(self, tool_type=None): """Load tools with the common set of filter parameters. Centralizes the repeated pattern of calling load_tools with the same exclude/include/tools_file/config kwargs used across _setup_smcp_tools. """ self.tooluniverse.load_tools( tool_type=tool_type, exclude_tools=self.exclude_tools, exclude_categories=self.exclude_categories, include_tools=self.include_tools, tools_file=self.tools_file, tool_config_files=self.tool_config_files, include_tool_types=self.include_tool_types, exclude_tool_types=self.exclude_tool_types, )
[docs] def _ensure_compact_mode_categories(self): """Load tool discovery categories required for compact mode.""" if not self.compact_mode: return for category in ("tool_finder", "compact_mode"): try: self.tooluniverse.load_tools(tool_type=[category]) except Exception as e: self.logger.debug(f"Could not load category {category}: {e}")
[docs] def _load_by_categories(self): """Load tools for the requested categories with validation and fallback.""" try: valid_categories = self._get_valid_categories() invalid = [c for c in self.tool_categories if c not in valid_categories] if invalid: self.logger.warning( f"Invalid categories {invalid}. Available: {list(valid_categories)}" ) valid_only = [c for c in self.tool_categories if c in valid_categories] if valid_only: self.logger.info(f"Loading valid categories: {valid_only}") self._load_tools_with_filters(tool_type=valid_only) else: self.logger.warning( "No valid categories found, loading all tools instead" ) self._load_tools_with_filters() else: self._load_tools_with_filters(tool_type=self.tool_categories) self._ensure_compact_mode_categories() except Exception as e: self.logger.error(f"Error loading specified categories: {e}") self.logger.info("Falling back to loading all tools") self._load_tools_with_filters() self._ensure_compact_mode_categories()
[docs] async def _tools_find_middleware(self, context, call_next): """ Middleware for handling tools/find and tools/search requests. This middleware intercepts tools/find and tools/search requests and provides AI-powered tool discovery functionality. """ # Check if this is a tools/find or tools/search request if hasattr(context, "method") and context.method in [ "tools/find", "tools/search", ]: try: # Handle the custom method result = await self._handle_tools_find(context.id, context.params) return result except Exception as e: self.logger.error(f"Error in tools/find middleware: {e}") return { "jsonrpc": "2.0", "id": context.id, "error": {"code": -32603, "message": f"Internal error: {str(e)}"}, } # For all other methods, call the next middleware/handler return await call_next(context)
[docs] async def _handle_tools_find( self, request_id: str, params: Dict[str, Any] ) -> Dict[str, Any]: """Handle the custom tools/find MCP method. Searches for tools by natural language query and returns a JSON-RPC 2.0 response in either ``detailed`` (default) or ``mcp_standard`` format. """ try: # Extract parameters query = params.get("query", "") categories = params.get("categories") limit = params.get("limit", 10) use_advanced_search = params.get("use_advanced_search", True) search_method = params.get( "search_method", "auto" ) # 'auto', 'llm', 'embedding', 'keyword' format_type = params.get( "format", "detailed" ) # 'detailed' or 'mcp_standard' if not query: return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32602, "message": "Invalid params: 'query' is required", }, } # Perform the search using existing search functionality search_result = await self._perform_tool_search( query=query, categories=categories, limit=limit, use_advanced_search=use_advanced_search, search_method=search_method, ) # Parse the search result search_data = json.loads(search_result) # Handle different response formats if isinstance(search_data, list): # If search_data is a list, treat it as tools directly tools_list = search_data search_metadata = { "search_query": query, "search_method": "unknown", "total_matches": len(tools_list), "categories_filtered": categories, } elif isinstance(search_data, dict): # If search_data is a dict, extract tools and metadata tools_list = search_data.get("tools", []) search_metadata = { "search_query": query, "search_method": search_data.get("search_method", "unknown"), "total_matches": search_data.get("total_matches", len(tools_list)), "categories_filtered": categories, } else: # Fallback for unexpected format tools_list = [] search_metadata = { "search_query": query, "search_method": "unknown", "total_matches": 0, "categories_filtered": categories, } # Format response based on requested format if format_type == "mcp_standard": # Format as standard MCP tools/list style response mcp_tools_list = [] for tool in tools_list: mcp_tool = { "name": tool.get("name"), "description": tool.get("description", ""), "inputSchema": { "type": "object", "properties": tool.get("parameters", {}), "required": tool.get("required", []), }, } mcp_tools_list.append(mcp_tool) result = { "tools": mcp_tools_list, "_meta": search_metadata, } else: # Return detailed format (default) result = search_data return {"jsonrpc": "2.0", "id": request_id, "result": result} except json.JSONDecodeError as e: return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32603, "message": f"Search result parsing error: {str(e)}", }, } except Exception as e: return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32603, "message": f"Internal error in tools/find: {str(e)}", }, }
# -- MCP Tasks Protocol Handlers --
[docs] async def _ensure_task_manager(self) -> None: """Start the task manager if it has not been started yet.""" if not self._task_manager_started: await self.task_manager.start() self._task_manager_started = True
[docs] async def handle_tasks_get( self, task_id: str, auth_context: Optional[str] = None ) -> Dict[str, Any]: """Get current task status.""" await self._ensure_task_manager() return await self.task_manager.get_status(task_id, auth_context)
[docs] async def handle_tasks_list( self, auth_context: Optional[str] = None, cursor: Optional[str] = None ) -> Dict[str, Any]: """List all tasks.""" await self._ensure_task_manager() return await self.task_manager.list_tasks(auth_context, cursor)
[docs] async def handle_tasks_cancel( self, task_id: str, auth_context: Optional[str] = None ) -> Dict[str, Any]: """Cancel a running task.""" await self._ensure_task_manager() return await self.task_manager.cancel_task(task_id, auth_context)
[docs] async def handle_tasks_result( self, task_id: str, auth_context: Optional[str] = None, timeout: Optional[float] = None, ) -> Dict[str, Any]: """Wait for task completion and return its result.""" await self._ensure_task_manager() result = await self.task_manager.get_result(task_id, auth_context, timeout) return { "content": [ { "type": "text", "text": json.dumps(result, ensure_ascii=False), } ], "_meta": { "io.modelcontextprotocol/related-task": { "taskId": task_id, }, }, }
[docs] def _select_search_tool(self, search_method: str, use_advanced_search: bool) -> str: """ Select the appropriate search tool based on method and availability. Returns: str: Tool name to use for search """ # Get available tools all_tools = self.tooluniverse.return_all_loaded_tools() available_tool_names = [tool.get("name", "") for tool in all_tools] # Handle specific method requests if search_method == "keyword": return "Tool_Finder_Keyword" elif search_method == "llm" and "Tool_Finder_LLM" in available_tool_names: return "Tool_Finder_LLM" elif search_method == "embedding" and "Tool_Finder" in available_tool_names: return "Tool_Finder" elif search_method == "auto": # Auto-selection priority: Keyword > RAG > LLM if use_advanced_search: if "Tool_Finder_Keyword" in available_tool_names: return "Tool_Finder_Keyword" if "Tool_Finder" in available_tool_names: return "Tool_Finder" elif "Tool_Finder_LLM" in available_tool_names: return "Tool_Finder_LLM" else: # Invalid method or method not available, fallback to keyword return "Tool_Finder_Keyword"
[docs] def _setup_smcp_tools(self): """Initialize ToolUniverse tools, expose them as MCP tools, and set up search. Handles: pre-loaded tool detection, category validation and loading with fallback, compact mode discovery categories, tool exposure to MCP, search initialization, and utility tool registration. """ # Determine if ToolUniverse already has tools loaded preloaded_tools = getattr(self.tooluniverse, "all_tools", []) preloaded_count = ( len(preloaded_tools) if isinstance(preloaded_tools, list) else 0 ) space_loaded = ( self.space and hasattr(self.tooluniverse, "_current_space_config") and preloaded_count > 0 ) if preloaded_count > 0: self.logger.info( f"ToolUniverse already pre-configured with {preloaded_count} tool(s); skipping automatic loading." ) self._ensure_compact_mode_categories() if space_loaded: self.logger.info( f"Space configuration loaded {preloaded_count} tool(s), skipping additional loading" ) self._ensure_compact_mode_categories() elif preloaded_count == 0 and self.tool_categories: self._load_by_categories() elif (self.auto_expose_tools or self.compact_mode) and not space_loaded: # Load all tools by default (unless Space already handled it) self._load_tools_with_filters() self._ensure_compact_mode_categories() if self.compact_mode: self.logger.info( f"Compact mode: Loaded {len(self.tooluniverse.all_tools)} tools in background" ) # Auto-expose ToolUniverse tools as MCP tools # In compact mode, _expose_tooluniverse_tools will call _expose_core_discovery_tools if self.auto_expose_tools or self.compact_mode: self._expose_tooluniverse_tools() # Add search functionality if enabled if self.search_enabled: self._add_search_tools() # Add utility tools self._add_utility_tools()
[docs] def _expose_tooluniverse_tools(self): """Convert and register loaded ToolUniverse tools as MCP-compatible tools. Skips meta-tools (MCPAutoLoaderTool, MCPClientTool) and tracks already-exposed tools to prevent duplicates. Individual tool failures are logged but do not halt the process. """ if not hasattr(self.tooluniverse, "all_tools"): self.logger.warning("No all_tools attribute in tooluniverse") return # Skip automatic tool exposure in compact mode if self.compact_mode: self.logger.info( "Compact mode: Skipping automatic tool exposure. " "Only core tools will be exposed." ) # In compact mode, explicitly expose only core discovery tools self._expose_core_discovery_tools() return self.logger.info( f"Exposing {len(self.tooluniverse.all_tools)} tools from ToolUniverse" ) # Define tool types that should not be exposed as MCP tools # These are internal/meta tools that are used for loading other tools skip_tool_types = {"MCPAutoLoaderTool", "MCPClientTool"} for i, tool_config in enumerate(self.tooluniverse.all_tools): try: # Debug: Check the type of tool_config if not isinstance(tool_config, dict): self.logger.warning( f"tool_config at index {i} is not a dict, it's {type(tool_config)}: {tool_config}" ) continue tool_name = tool_config.get("name") tool_type = tool_config.get("type") # Skip internal/meta tools that are used for loading other tools if tool_type in skip_tool_types: self.logger.debug( f"Skipping exposure of meta tool: {tool_name} (type: {tool_type})" ) continue if tool_name and tool_name not in self._exposed_tools: # tool_name is already shortened (primary identifier) self._create_mcp_tool_from_tooluniverse(tool_config, tool_name) self._exposed_tools.add(tool_name) except Exception as e: self.logger.error(f"Error processing tool at index {i}: {e}") self.logger.debug(f"Tool config: {tool_config}") continue exposed_count = len(self._exposed_tools) self.logger.info(f"Successfully exposed {exposed_count} tools to MCP interface")
[docs] def _expose_core_discovery_tools(self): """ Expose only core tool discovery tools in compact mode. """ core_tool_names = [ "list_tools", "grep_tools", "get_tool_info", "execute_tool", ] exposed_count = 0 for tool_config in self.tooluniverse.all_tools: if not isinstance(tool_config, dict): continue tool_name = tool_config.get("name") if tool_name in core_tool_names: try: if tool_name not in self._exposed_tools: self._create_mcp_tool_from_tooluniverse(tool_config) self._exposed_tools.add(tool_name) exposed_count += 1 self.logger.debug(f"Exposed core tool: {tool_name}") except Exception as e: self.logger.warning(f"Failed to expose core tool {tool_name}: {e}") self.logger.info(f"Compact mode: Exposed {exposed_count} core discovery tools")
[docs] def _add_search_tools(self): """Register the ``find_tools`` MCP tool for AI-powered tool discovery. Initializes the tool finder (ToolFinderLLM > Tool_RAG > keyword fallback) and registers a ``find_tools`` MCP tool that delegates to ``_perform_tool_search``. """ # Initialize tool finder (prefer LLM-based if available, fallback to embedding-based) self._init_tool_finder() if "tool_finder" in (self.exclude_categories or []): self.logger.info( "🚫 Skipping 'find_tools' registration (tool_finder excluded)" ) return from mcp.types import ToolAnnotations @self.tool( annotations=ToolAnnotations( readOnlyHint=True, # Search tool is read-only destructiveHint=False, ) ) async def find_tools( query: str, categories: Optional[List[str]] = None, limit: int = 10, use_advanced_search: bool = True, search_method: str = "auto", ) -> str: """ Find and search available ToolUniverse tools using AI-powered search. This tool provides the same functionality as the tools/find MCP method. Args: query: Search query describing the desired functionality categories: Optional list of categories to filter by limit: Maximum number of results to return (default: 10) use_advanced_search: Use AI-powered search if available (default: True) search_method: Specific search method - 'auto', 'llm', 'embedding', 'keyword' (default: 'auto') Returns: JSON string containing matching tools with detailed information """ return await self._perform_tool_search( query, categories, limit, use_advanced_search, search_method )
# # Keep the original search_tools as an alias for backward compatibility # @self.tool() # async def search_tools( # query: str, # categories: Optional[List[str]] = None, # limit: int = 10, # use_advanced_search: bool = True, # search_method: str = 'auto' # ) -> str: # """ # Search available ToolUniverse tools (alias for find_tools). # Args: # query: Search query string describing the desired functionality # categories: Optional list of categories to filter by # limit: Maximum number of results to return # use_advanced_search: Whether to use AI-powered tool finder # search_method: Specific search method - 'auto', 'llm', 'embedding', 'keyword' (default: 'auto') # Returns: # JSON string containing matching tools information # """ # return await self._perform_tool_search(query, categories, limit, use_advanced_search, search_method)
[docs] def _init_tool_finder(self): """Initialize the best available tool finder (LLM > RAG > keyword). Sets ``self.tool_finder_available`` and ``self.tool_finder_type``. Attempts to load the ``tool_finder`` category if no search tools are found among the already-loaded tools. """ self.tool_finder_available = False self.tool_finder_type = None # Check if ToolFinderLLM is available in loaded tools try: all_tools = self.tooluniverse.return_all_loaded_tools() available_tool_names = [tool.get("name", "") for tool in all_tools] # Try ToolFinderLLM first (more advanced) if "Tool_Finder_LLM" in available_tool_names: self.tool_finder_available = True self.tool_finder_type = "Tool_Finder_LLM" self.logger.info("✅ Tool_Finder_LLM available for advanced search") return # Fallback to Tool_RAG (embedding-based) if "Tool_RAG" in available_tool_names: self.tool_finder_available = True self.tool_finder_type = "Tool_RAG" self.logger.info( "✅ Tool_RAG (embedding-based) available for advanced search" ) return # Check if ToolFinderKeyword is available for simple search if "Tool_Finder_Keyword" in available_tool_names: self.logger.info("✅ ToolFinderKeyword available for simple search") self.logger.warning("⚠️ No advanced tool finders available in loaded tools") self.logger.debug( f"Available tools: {available_tool_names[:5]}..." ) # Show first 5 tools except Exception as e: self.logger.warning(f"⚠️ Failed to check for tool finders: {e}") # Try to load tool finder tools if not already loaded try: # Respect exclude_categories - don't load if explicitly excluded if "tool_finder" in (self.exclude_categories or []): self.logger.debug("tool_finder category is excluded, skipping load") elif not self.tool_finder_available: # Only load if search tools aren't already available self.logger.info( "Attempting to load tool_finder category for search functionality" ) try: self.tooluniverse.load_tools(tool_type=["tool_finder"]) except Exception as e: self.logger.debug(f"Could not load tool_finder category: {e}") # Re-check availability after potential loading all_tools = self.tooluniverse.return_all_loaded_tools() available_tool_names = [tool.get("name", "") for tool in all_tools] if "Tool_Finder_LLM" in available_tool_names: self.tool_finder_available = True self.tool_finder_type = "Tool_Finder_LLM" self.logger.info( "✅ Successfully loaded Tool_Finder_LLM for advanced search" ) elif "Tool_RAG" in available_tool_names: self.tool_finder_available = True self.tool_finder_type = "Tool_RAG" self.logger.info("✅ Successfully loaded Tool_RAG for advanced search") else: self.logger.warning("⚠️ Failed to load any advanced tool finder tools") # Check if ToolFinderKeyword is available for simple search if "Tool_Finder_Keyword" in available_tool_names: self.logger.info("✅ Tool_Finder_Keyword available for simple search") else: self.logger.warning( "⚠️ ToolFinderKeyword not available, using fallback search" ) except Exception as e: self.logger.warning(f"⚠️ Failed to load tool finder tools: {e}") self.logger.info( "📝 Advanced search will not be available, using simple keyword search only" )
[docs] def _add_utility_tools(self): """Register utility tools (currently a no-op; execute_tool is a native ToolUniverse tool)."""
# Note: execute_tool is now a ToolUniverse native tool # It is exposed via _expose_core_discovery_tools() in compact mode # or via _expose_tooluniverse_tools() in normal mode
[docs] def add_custom_tool( self, name: str, function: Callable, description: Optional[str] = None, **kwargs ): """Add a custom Python function as an MCP tool. Args: name: Unique tool name for the MCP interface. function: Sync or async callable to register. description: If provided, overrides the function's docstring. **kwargs: Additional FastMCP tool configuration options. Returns: The decorated function registered with FastMCP. """ if description: function.__doc__ = description # Use FastMCP's tool decorator decorated_function = self.tool(name=name, **kwargs)(function) return decorated_function
[docs] def _get_tool_annotations(self, tool_config: Dict[str, Any]) -> Dict[str, bool]: """ Get MCP tool annotations from tool config. Annotations should already be computed and stored in tool_config['mcp_annotations'] during tool registration. This method simply retrieves them, with a fallback to compute them if they're missing (for backward compatibility). Parameters ---------- tool_config : dict Tool configuration dictionary Returns ------- dict Dictionary with readOnlyHint and destructiveHint boolean values """ # Check if annotations are already in tool_config (preferred) if "mcp_annotations" in tool_config: return tool_config["mcp_annotations"] # Fallback: compute annotations if not present (backward compatibility) from .tool_defaults import get_annotations_for_tool return get_annotations_for_tool(tool_config=tool_config)
[docs] async def close(self): """Gracefully shut down the SMCP server, stopping the task manager and thread pool.""" try: # Stop TaskManager if self._task_manager_started: await self.task_manager.stop() self.logger.info("TaskManager stopped") except Exception as e: self.logger.error(f"Error stopping TaskManager: {e}") try: # Shutdown thread pool self.executor.shutdown(wait=True) except Exception: pass
[docs] def _print_tooluniverse_banner(self): """Print ToolUniverse branding banner after FastMCP banner with dynamic information.""" # Get transport info if available transport_display = getattr(self, "_transport_type", "Unknown") server_url = getattr(self, "_server_url", "N/A") tools_count = len(self._exposed_tools) # Map transport types to display names transport_map = { "stdio": "STDIO", "streamable-http": "Streamable-HTTP", "http": "HTTP", "sse": "SSE", } transport_name = transport_map.get(transport_display, transport_display) # Format lines with proper alignment (matching FastMCP style) # Each line should be exactly 75 characters (emoji takes 2 display widths but counts as 1 in len()) transport_line = f" 📦 Transport: {transport_name}" server_line = f" 🔗 Server URL: {server_url}" tools_line = f" 🧰 Loaded Tools: {tools_count}" # Pad to exactly 75 characters (emoji counts as 1 in len() but displays as 2) transport_line = transport_line + " " * (75 - len(transport_line)) server_line = server_line + " " * (75 - len(server_line)) tools_line = tools_line + " " * (75 - len(tools_line)) banner = f""" ╭────────────────────────────────────────────────────────────────────────────╮ │ │ │ 🧬 ToolUniverse SMCP Server 🧬 │ │ │ │ Bridging AI Agents with Scientific Computing Tools │ │ │ {transport_line} {server_line} {tools_line} │ │ │ 🌐 Website: https://aiscientist.tools/ │ │ 💻 GitHub: https://github.com/mims-harvard/ToolUniverse │ │ │ ╰────────────────────────────────────────────────────────────────────────────╯ """ # In stdio mode, ensure the banner goes to stderr to avoid polluting stdout # which must exclusively carry JSON-RPC messages. import sys as _sys if getattr(self, "_transport_type", None) == "stdio": print(banner, file=_sys.stderr) else: print(banner)
[docs] def run(self, *args, **kwargs): """ Override run method to display ToolUniverse banner after FastMCP banner. This method intercepts the parent's run() call to inject our custom banner immediately after FastMCP displays its startup banner. """ # Save transport information for banner display transport = kwargs.get("transport", args[0] if args else "unknown") host = kwargs.get("host", "0.0.0.0") port = kwargs.get("port", 7000) self._transport_type = transport # Build server URL based on transport if transport == "streamable-http" or transport == "http": self._server_url = f"http://{host}:{port}/mcp" elif transport == "sse": self._server_url = f"http://{host}:{port}" else: self._server_url = "N/A (stdio mode)" # Use threading to print our banner shortly after FastMCP's banner import threading import time def delayed_banner(): """Print ToolUniverse banner with a small delay to appear after FastMCP banner.""" time.sleep(1.0) # Delay to ensure FastMCP banner displays first self._print_tooluniverse_banner() # Start banner thread only on first run if not hasattr(self, "_tooluniverse_banner_shown"): self._tooluniverse_banner_shown = True banner_thread = threading.Thread(target=delayed_banner, daemon=True) banner_thread.start() # Call parent's run method (blocking call) return super().run(*args, **kwargs)
[docs] def run_simple( self, transport: Literal["stdio", "http", "sse"] = "http", host: str = "0.0.0.0", port: int = 7000, **kwargs, ): """Start the SMCP server with the given transport. Args: transport: Communication protocol - "stdio", "http", or "sse". host: Bind address for HTTP/SSE transports. port: Port for HTTP/SSE transports. **kwargs: Additional arguments passed to FastMCP's run(). """ self.logger.info(f"🚀 Starting SMCP server '{self.name}'...") self.logger.info( f"📊 Loaded {len(self._exposed_tools)} tools from ToolUniverse" ) self.logger.info(f"🔍 Search enabled: {self.search_enabled}") # Log hook configuration if self.hooks_enabled or self.hook_type: if self.hook_type: self.logger.info(f"🔗 Hooks enabled: {self.hook_type}") elif self.hook_config: hook_count = len(self.hook_config.get("hooks", [])) self.logger.info(f"🔗 Hooks enabled: {hook_count} custom hooks") else: self.logger.info("🔗 Hooks enabled: default configuration") else: self.logger.info("🔗 Hooks disabled") # Configure logger for stdio mode to avoid stdout pollution if transport == "stdio": import logging import sys # Redirect all logger output to stderr for stdio mode for handler in self.logger.handlers[:]: self.logger.removeHandler(handler) stderr_handler = logging.StreamHandler(sys.stderr) stderr_handler.setLevel(logging.INFO) formatter = logging.Formatter("%(message)s") stderr_handler.setFormatter(formatter) self.logger.addHandler(stderr_handler) self.logger.setLevel(logging.INFO) # Also redirect root logger to stderr root_logger = logging.getLogger() for handler in root_logger.handlers[:]: root_logger.removeHandler(handler) root_logger.addHandler(stderr_handler) root_logger.setLevel(logging.INFO) try: if transport == "stdio": self.run(transport="stdio", **kwargs) elif transport == "http": self.run(transport="streamable-http", host=host, port=port, **kwargs) elif transport == "sse": self.run(transport="sse", host=host, port=port, **kwargs) else: raise ValueError(f"Unsupported transport: {transport}") except KeyboardInterrupt: self.logger.info("\n🛑 Server stopped by user") except Exception as e: self.logger.error(f"❌ Server error: {e}") finally: # Cleanup asyncio.run(self.close())
# ------------------------------------------------------------------ # JSON Schema -> Python type helpers for MCP tool construction # ------------------------------------------------------------------ # Simple JSON Schema type -> Python type mapping (with lenient str coercion) _SIMPLE_TYPE_MAP = { "string": str, "integer": Union[int, str], "number": Union[float, str], "boolean": Union[bool, str], "array": list, "object": dict, }
[docs] @staticmethod def _resolve_oneof_type(param_info: Dict[str, Any]) -> tuple: """Convert a JSON Schema oneOf spec to (python_type, field_kwargs_update). Returns: (python_type, extra_field_kwargs) where extra_field_kwargs may contain json_schema_extra with the oneOf schema. """ one_of_types: List = [] one_of_schemas: List = [] for item in param_info["oneOf"]: item_type = item.get("type") if item_type == "string": one_of_types.append(str) one_of_schemas.append({"type": "string"}) elif item_type == "array": items = item.get("items", {}) if items.get("type") == "string": one_of_types.append(list[str]) one_of_schemas.append( {"type": "array", "items": {"type": "string"}} ) else: one_of_types.append(list) one_of_schemas.append({"type": "array", "items": items}) elif item_type == "integer": one_of_types.append(int) one_of_schemas.append({"type": "integer"}) elif item_type == "number": one_of_types.append(float) one_of_schemas.append({"type": "number"}) elif item_type == "boolean": one_of_types.append(bool) one_of_schemas.append({"type": "boolean"}) elif item_type == "object": one_of_types.append(dict) one_of_schemas.append(item) if len(one_of_types) == 0: python_type = str elif len(one_of_types) == 1: python_type = one_of_types[0] else: python_type = Union[tuple(one_of_types)] return python_type, {"json_schema_extra": {"oneOf": one_of_schemas}}
[docs] @classmethod def _resolve_param_type(cls, param_info: Dict[str, Any]) -> tuple: """Map a single JSON Schema parameter to (python_type, extra_field_kwargs). Handles oneOf, simple types, array items cleanup, and nested object cleanup. """ extra: Dict[str, Any] = {} # oneOf takes priority if "oneOf" in param_info: return cls._resolve_oneof_type(param_info) param_type = param_info.get("type", "string") # Handle nullable types like ["string", "null"] if isinstance(param_type, list): non_null = [t for t in param_type if t != "null"] is_nullable = "null" in param_type base_type = non_null[0] if non_null else "string" base_python_type = cls._SIMPLE_TYPE_MAP.get(base_type, str) python_type = ( Optional[base_python_type] if is_nullable else base_python_type ) param_type = base_type else: python_type = cls._SIMPLE_TYPE_MAP.get(param_type, str) if param_type == "array": items_info = param_info.get("items", {}) cleaned_items = ( {k: v for k, v in items_info.items() if k != "required"} if items_info else {"type": "string"} ) extra["json_schema_extra"] = {"type": "array", "items": cleaned_items} elif param_type == "object": object_props = param_info.get("properties", {}) if object_props: cleaned_props = {} nested_required = [] for prop_name, prop_val in object_props.items(): cleaned = prop_val.copy() if "required" in cleaned: req_value = cleaned.pop("required") if req_value in ["True", "true", True]: nested_required.append(prop_name) cleaned_props[prop_name] = cleaned schema = {"type": "object", "properties": cleaned_props} if nested_required: schema["required"] = nested_required extra["json_schema_extra"] = schema elif param_type not in cls._SIMPLE_TYPE_MAP: extra["json_schema_extra"] = {"type": param_type} return python_type, extra
[docs] def _create_mcp_tool_from_tooluniverse( self, tool_config: Dict[str, Any], mcp_name: Optional[str] = None ): """Create an MCP tool from a ToolUniverse tool configuration. This method creates a function with proper parameter signatures that match the ToolUniverse tool schema, enabling FastMCP's automatic parameter validation. Args: tool_config: ToolUniverse tool configuration dictionary mcp_name: Optional shortened name for MCP exposure (if None, uses original name) """ try: # Debug: Ensure tool_config is a dictionary if not isinstance(tool_config, dict): raise ValueError( f"tool_config must be a dictionary, got {type(tool_config)}: {tool_config}" ) tool_name = tool_config["name"] # Use shortened MCP name if provided, otherwise use original exposed_name = mcp_name if mcp_name is not None else tool_name description = tool_config.get( "description", f"ToolUniverse tool: {tool_name}" ) parameters = tool_config.get("parameter", {}) # Extract parameter information from the schema # Handle case where properties might be None (like in Finish tool) properties = parameters.get("properties") if properties is None: properties = {} required_params = parameters.get("required", []) # Handle non-standard schema format where 'required' is set on individual properties # instead of at the object level (common in ToolUniverse schemas) if not required_params and properties: required_params = [ param_name for param_name, param_info in properties.items() if param_info.get("required", False) ] # Build function signature dynamically with Pydantic Field support import inspect from typing import Annotated from pydantic import Field # Create parameter signature for the function func_params = [] param_annotations = {} # Process parameters in two phases: required first, then optional # This ensures Python function signature validity (no default args before non-default) for is_required_phase in [True, False]: for param_name, param_info in properties.items(): param_description = param_info.get( "description", f"{param_name} parameter" ) is_required = param_name in required_params # Skip if not in current phase if is_required != is_required_phase: continue # Resolve Python type and optional json_schema_extra python_type, extra_kwargs = self._resolve_param_type(param_info) field_kwargs = {"description": param_description, **extra_kwargs} pydantic_field = Field(**field_kwargs) if is_required: # Required parameter with description and schema info annotated_type = Annotated[python_type, pydantic_field] param_annotations[param_name] = annotated_type func_params.append( inspect.Parameter( param_name, inspect.Parameter.POSITIONAL_OR_KEYWORD, annotation=annotated_type, ) ) else: # Optional parameter with description, schema info and default value annotated_type = Annotated[ Union[python_type, type(None)], pydantic_field ] param_annotations[param_name] = annotated_type func_params.append( inspect.Parameter( param_name, inspect.Parameter.POSITIONAL_OR_KEYWORD, default=None, annotation=annotated_type, ) ) # Add _tooluniverse_stream as an optional parameter for streaming support # This parameter is NOT exposed in the MCP schema (it's in kwargs but not in param_annotations) # Users can pass it to enable streaming, but it won't appear in the tool schema async def dynamic_tool_function(**kwargs) -> str: """Execute ToolUniverse tool with provided arguments.""" import json stream_callback = None try: # Remove ctx if present (legacy support) ctx = kwargs.pop("ctx", None) if "ctx" in kwargs else None # Extract streaming flag (users can optionally pass this) stream_flag = bool(kwargs.pop("_tooluniverse_stream", False)) # Extract task metadata if present (for MCP Tasks) task_request = ( kwargs.pop("_task", None) if "_task" in kwargs else None ) # Filter out None values for optional parameters # Note: _tooluniverse_stream was extracted and popped above # so it won't be in args_dict, which is what we want args_dict = {k: v for k, v in kwargs.items() if v is not None} # Check if tool supports tasks execution_config = tool_config.get("execution", {}) task_support = execution_config.get("taskSupport", "forbidden") # Handle task request if task_request and task_support != "forbidden": # Create task instead of executing directly if not self._task_manager_started: await self.task_manager.start() self._task_manager_started = True ttl = task_request.get("ttl", 3600000) # Default 1 hour task_id = await self.task_manager.create_task( tool_name=tool_name, arguments=args_dict, ttl=ttl, ) # Return task creation response return json.dumps( { "_meta": { "task": { "taskId": task_id, "status": "working", "statusMessage": f"Task {task_id} submitted", "pollInterval": 5000, } } }, ensure_ascii=False, ) elif task_request and task_support == "forbidden": return json.dumps( { "error": f"Tool {tool_name} does not support task execution" }, ensure_ascii=False, ) # Validate required parameters (check against args_dict, not filtered_args) missing_required = [ param for param in required_params if param not in args_dict ] if missing_required: return json.dumps( { "error": f"Missing required parameters: {missing_required}", "required": required_params, "provided": list(args_dict.keys()), }, ensure_ascii=False, ) function_call = {"name": tool_name, "arguments": args_dict} try: loop = asyncio.get_running_loop() except RuntimeError: loop = asyncio.get_event_loop() if stream_flag and ctx is not None: def _stream_callback(chunk: str) -> None: if not chunk: return try: future = asyncio.run_coroutine_threadsafe( ctx.info(chunk), loop ) def _log_future_result(fut) -> None: exc = fut.exception() if exc: self.logger.debug( f"Streaming callback error for {tool_name}: {exc}" ) future.add_done_callback(_log_future_result) except Exception as cb_error: # noqa: BLE001 self.logger.debug( f"Failed to dispatch stream chunk for {tool_name}: {cb_error}" ) # Assign the function to stream_callback stream_callback = _stream_callback # Note: _tooluniverse_stream was extracted from kwargs above # and is not passed to the tool. The stream_callback is sufficient # to enable streaming for downstream tools. # In stdio mode, capture stdout to prevent pollution of JSON-RPC stream is_stdio_mode = getattr(self, "_transport_type", None) == "stdio" if is_stdio_mode: # Wrap tool execution to capture stdout and redirect to stderr def _run_with_stdout_capture(): import io old_stdout = sys.stdout try: # Capture stdout during tool execution stdout_capture = io.StringIO() sys.stdout = stdout_capture # Execute the tool result = self.tooluniverse.run_one_function( function_call, stream_callback=stream_callback, ) # Get captured output and redirect to stderr captured_output = stdout_capture.getvalue() if captured_output: self.logger.debug( f"[{tool_name}] Captured stdout: {captured_output}" ) # Write to stderr to avoid polluting stdout print(captured_output, file=sys.stderr, end="") return result finally: sys.stdout = old_stdout run_callable = _run_with_stdout_capture else: # In HTTP/SSE mode, no need to capture stdout run_callable = functools.partial( self.tooluniverse.run_one_function, function_call, stream_callback=stream_callback, ) result = await loop.run_in_executor(self.executor, run_callable) # Ensure result is properly serialized to JSON if isinstance(result, str): # Try to parse as JSON to validate, if fails wrap it try: json.loads(result) return result except (json.JSONDecodeError, ValueError): # Not valid JSON, wrap it return json.dumps({"result": result}, ensure_ascii=False) elif isinstance(result, (dict, list)): return json.dumps(result, ensure_ascii=False, default=str) else: # For other types, convert to JSON return json.dumps({"result": str(result)}, ensure_ascii=False) except Exception as e: error_msg = f"Error executing {tool_name}: {str(e)}" self.logger.error( f"{tool_name} execution failed: {error_msg}", exc_info=True ) return json.dumps( {"error": error_msg, "error_type": type(e).__name__}, ensure_ascii=False, ) # Set function metadata (use exposed_name for MCP registration) dynamic_tool_function.__name__ = exposed_name dynamic_tool_function.__signature__ = inspect.Signature(func_params) annotations = param_annotations.copy() annotations["return"] = str dynamic_tool_function.__annotations__ = annotations # Create detailed docstring for internal use, but use clean description for FastMCP param_docs = [] for param_name, param_info in properties.items(): param_desc = param_info.get("description", f"{param_name} parameter") param_type = param_info.get("type", "string") is_required = param_name in required_params required_text = "required" if is_required else "optional" param_docs.append( f" {param_name} ({param_type}, {required_text}): {param_desc}" ) # Set a simple docstring for the function (internal use) dynamic_tool_function.__doc__ = f"""{description} Returns: str: Tool execution result """ # Get tool annotations (with defaults and overrides) annotations_dict = self._get_tool_annotations(tool_config) # Convert to MCP ToolAnnotations object from mcp.types import ToolAnnotations tool_annotations = ToolAnnotations( readOnlyHint=annotations_dict.get("readOnlyHint"), destructiveHint=annotations_dict.get("destructiveHint"), ) # Register with FastMCP using exposed_name for MCP, but tool execution uses original tool_name self.tool(description=description, annotations=tool_annotations)( dynamic_tool_function ) except Exception as e: self.logger.error(f"Error creating MCP tool from config: {e}") self.logger.error(f"Error type: {type(e)}") import traceback self.logger.error(f"Traceback: {traceback.format_exc()}") self.logger.debug(f"Tool config: {tool_config}") # Don't raise - continue with other tools return
# Convenience function for quick server creation
[docs] def create_smcp_server( name: str = "SMCP Server", tool_categories: Optional[List[str]] = None, search_enabled: bool = True, **kwargs, ) -> SMCP: """Create a configured SMCP server instance. Convenience wrapper around ``SMCP(...)`` with sensible defaults. Args: name: Human-readable server name. tool_categories: ToolUniverse categories to load (None loads all). search_enabled: Enable AI-powered tool discovery. **kwargs: Additional SMCP / FastMCP configuration options. Returns: Configured SMCP server instance ready to run. """ return SMCP( name=name, tool_categories=tool_categories, search_enabled=search_enabled, **kwargs, )