Source code for tooluniverse.tool_discovery_tools

"""
Tool Discovery Tools - Standard ToolUniverse tools for discovering and
exploring available tools.

These tools provide efficient ways to discover, list, and explore tools
while minimizing context window usage through progressive disclosure.

Key Features:
- Progressive disclosure: Start with minimal info (names), get details
  when needed
- Agent-friendly: Simple text search (no regex required), natural
  language task discovery
- Workflow-oriented: Tools guide agent through discovery process
- Reduced tool calls: Combined search+detail tools minimize overhead

Tool Categories:
1. Listing Tools: list_tools (unified tool with multiple modes)
   - Use for initial exploration and overview
   - Modes: names, basic, categories, by_category, summary, custom
2. Search Tools: grep_tools, find_tools
   - Use for finding relevant tools
3. Definition Tools: get_tool_info
   - Use for getting detailed information about specific tools
   - Supports description or full definition, single or batch

Recommended Workflow:
1. Start with list_tools(mode="names") or list_tools(mode="categories")
2. Use find_tools (for natural language) or grep_tools
   (for keywords) to find relevant tools
3. Use get_tool_info to get full details
4. Execute tools using execute_tool
"""

import json
import re
from .base_tool import BaseTool
from .tool_registry import register_tool


def _get_tool_category(tool, tool_name, tooluniverse):
    """
    Get the category for a tool, looking it up from tool_category_dicts if not in tool config.

    Args:
        tool: Tool configuration dict
        tool_name: Name of the tool
        tooluniverse: ToolUniverse instance with tool_category_dicts

    Returns:
        str: Category name, or "unknown" if not found
    """
    # First check if category is in tool config
    if isinstance(tool, dict) and "category" in tool:
        category = tool.get("category")
        if category and category != "unknown":
            return category

    # If not found, look up in tool_category_dicts
    if tooluniverse and hasattr(tooluniverse, "tool_category_dicts"):
        for cat_name, tools_in_cat in tooluniverse.tool_category_dicts.items():
            # tools_in_cat can be a list of tool dicts or tool names
            if isinstance(tools_in_cat, list):
                for item in tools_in_cat:
                    if isinstance(item, dict):
                        if item.get("name") == tool_name:
                            return cat_name
                    elif isinstance(item, str):
                        if item == tool_name:
                            return cat_name

    return "unknown"


[docs] @register_tool("GrepTools") class GrepToolsTool(BaseTool): """Native grep-like pattern search for tools (simple regex, independent from Tool_Finder_Keyword)."""
[docs] def __init__(self, tool_config, tooluniverse=None): super().__init__(tool_config) self.tooluniverse = tooluniverse
[docs] def run(self, arguments): """ Search tools using simple text matching or regex pattern matching. Args: arguments (dict): Dictionary containing: - pattern (str): Search pattern (text or regex) - field (str, optional): Field to search in: "name", "description", "type", "category" (default: "name") - search_mode (str, optional): "text" for simple text matching or "regex" for regex (default: "text") - categories (list, optional): Optional category filter Returns: dict: Dictionary with matching tools (name + description) """ if not self.tooluniverse or not hasattr(self.tooluniverse, "all_tool_dict"): return {"error": "ToolUniverse not available"} pattern = arguments.get("pattern", "") field = arguments.get("field", "name") # 'text' or 'regex' search_mode = arguments.get("search_mode", "text") limit = arguments.get("limit", 100) offset = arguments.get("offset", 0) categories = arguments.get("categories") if not pattern: return {"error": "pattern parameter is required"} matching_tools = [] for tool_name, tool in self.tooluniverse.all_tool_dict.items(): # Filter by categories if provided if categories: tool_category = _get_tool_category(tool, tool_name, self.tooluniverse) if tool_category not in categories: continue # Search in specified field search_text = "" if field == "name": search_text = tool.get("name", "") elif field == "description": search_text = tool.get("description", "") elif field == "type": search_text = tool.get("type", "") elif field == "category": search_text = _get_tool_category(tool, tool_name, self.tooluniverse) # Perform search based on mode if search_text: matched = False if search_mode == "text": # Simple case-insensitive text matching matched = pattern.lower() in search_text.lower() elif search_mode == "regex": # Regex pattern matching try: regex = re.compile(pattern, re.IGNORECASE) matched = regex.search(search_text) except re.error as e: return {"error": f"Invalid regex pattern: {str(e)}"} else: return { "error": ( f"Invalid search_mode: {search_mode}. " "Must be 'text' or 'regex'" ) } if matched: matching_tools.append( { "name": tool.get("name", ""), "description": tool.get("description", ""), } ) # Apply pagination total_matches = len(matching_tools) if offset > 0 or limit: matching_tools = ( matching_tools[offset : offset + limit] if limit else matching_tools[offset:] ) return { "total_matches": total_matches, "limit": limit, "offset": offset, "has_more": (offset + len(matching_tools)) < total_matches if limit else False, "pattern": pattern, "field": field, "search_mode": search_mode, "tools": matching_tools, }
[docs] @register_tool("ListTools") class ListToolsTool(BaseTool): """Unified tool listing with multiple modes."""
[docs] def __init__(self, tool_config, tooluniverse=None): super().__init__(tool_config) self.tooluniverse = tooluniverse
[docs] def run(self, arguments): """ List tools with configurable output format via mode parameter. Args: arguments (dict): Dictionary containing: - mode (str, required): Output mode - "names": Return only tool names - "basic": Return name + description - "categories": Return category statistics - "by_category": Return tools grouped by category - "summary": Return name + description + type + has_parameters - "custom": Return user-specified fields - categories (list, optional): Filter by categories - fields (list, required for mode="custom"): Fields to include - group_by_category (bool, optional): Group by category (mode="names"|"basic"|"summary") - brief (bool, optional): Truncate description (mode="basic"|"summary") Returns: dict: Dictionary with tools in requested format """ if not self.tooluniverse or not hasattr(self.tooluniverse, "all_tool_dict"): return {"error": "ToolUniverse not available"} mode = arguments.get("mode") if not mode: return {"error": "mode parameter is required"} valid_modes = [ "names", "basic", "categories", "by_category", "summary", "custom", ] if mode not in valid_modes: return { "error": ( f"Invalid mode: {mode}. Must be one of: {', '.join(valid_modes)}" ) } categories = arguments.get("categories") group_by_category = arguments.get("group_by_category", False) brief = arguments.get("brief", False) limit = arguments.get("limit") offset = arguments.get("offset", 0) # Get all tools and filter by categories if provided tools = list(self.tooluniverse.all_tool_dict.items()) # Get (name, tool) pairs if categories: tools = [ (tool_name, tool) for tool_name, tool in tools if _get_tool_category(tool, tool_name, self.tooluniverse) in categories ] try: if mode == "names": # Return only tool names tool_names = [tool_name for tool_name, tool in tools if tool_name] if group_by_category: # Group by category tools_by_category = {} for tool_name, tool in tools: if tool_name: category = _get_tool_category( tool, tool_name, self.tooluniverse ) if category not in tools_by_category: tools_by_category[category] = [] tools_by_category[category].append(tool_name) # Apply pagination to each category if needed if limit or offset > 0: paginated_by_category = {} for cat, names in tools_by_category.items(): if offset > 0 or limit: paginated_by_category[cat] = ( names[offset : offset + limit] if limit else names[offset:] ) else: paginated_by_category[cat] = names tools_by_category = paginated_by_category total_count = sum( len(names) for names in tools_by_category.values() ) return { "tools_by_category": tools_by_category, "total_tools": total_count, "limit": limit, "offset": offset, "has_more": False, # Pagination per category is complex, set to False for now } else: # Apply pagination total_count = len(tool_names) if offset > 0 or limit: tool_names = ( tool_names[offset : offset + limit] if limit else tool_names[offset:] ) # Simple list of names return { "total_tools": total_count, "limit": limit, "offset": offset, "has_more": (offset + len(tool_names)) < total_count if limit else False, "tools": tool_names, } elif mode == "basic": # Return name + description tools_info = [] for tool_name, tool in tools: if tool_name: description = tool.get("description", "") if brief and len(description) > 100: # Truncate to first sentence or 100 chars sentence_end = description.find(". ") if sentence_end > 0 and sentence_end <= 100: description = description[: sentence_end + 1] else: description = description[:100] + "..." tool_info = {"name": tool_name, "description": description} tools_info.append(tool_info) if group_by_category: # Group by category tools_by_category = {} for tool_info in tools_info: # Need to get category from original tool tool_name = tool_info["name"] tool = self.tooluniverse.all_tool_dict.get(tool_name) if tool: category = _get_tool_category( tool, tool_name, self.tooluniverse ) if category not in tools_by_category: tools_by_category[category] = [] tools_by_category[category].append(tool_info) # Apply pagination to each category if needed if limit or offset > 0: paginated_by_category = {} for cat, infos in tools_by_category.items(): if offset > 0 or limit: paginated_by_category[cat] = ( infos[offset : offset + limit] if limit else infos[offset:] ) else: paginated_by_category[cat] = infos tools_by_category = paginated_by_category total_count = sum( len(infos) for infos in tools_by_category.values() ) return { "tools_by_category": tools_by_category, "total_tools": total_count, "limit": limit, "offset": offset, "has_more": False, # Pagination per category is complex, set to False for now } else: # Apply pagination total_count = len(tools_info) if offset > 0 or limit: tools_info = ( tools_info[offset : offset + limit] if limit else tools_info[offset:] ) return { "total_tools": total_count, "limit": limit, "offset": offset, "has_more": (offset + len(tools_info)) < total_count if limit else False, "tools": tools_info, } elif mode == "categories": # Return category statistics category_counts = {} for tool_name, tool in tools: category = _get_tool_category(tool, tool_name, self.tooluniverse) category_counts[category] = category_counts.get(category, 0) + 1 return {"categories": category_counts} elif mode == "by_category": # Return tools grouped by category (names only) tools_by_category = {} for tool_name, tool in tools: if tool_name: category = _get_tool_category( tool, tool_name, self.tooluniverse ) if category not in tools_by_category: tools_by_category[category] = [] tools_by_category[category].append(tool_name) # Apply pagination to each category if needed if limit or offset > 0: paginated_by_category = {} for cat, names in tools_by_category.items(): if offset > 0 or limit: paginated_by_category[cat] = ( names[offset : offset + limit] if limit else names[offset:] ) else: paginated_by_category[cat] = names tools_by_category = paginated_by_category total_count = sum(len(names) for names in tools_by_category.values()) return { "tools_by_category": tools_by_category, "total_tools": total_count, "limit": limit, "offset": offset, "has_more": False, # Pagination per category is complex, set to False for now } elif mode == "summary": # Return name + description + type + has_parameters tools_info = [] for tool_name, tool in tools: if tool_name: description = tool.get("description", "") if brief and len(description) > 100: sentence_end = description.find(". ") if sentence_end > 0 and sentence_end <= 100: description = description[: sentence_end + 1] else: description = description[:100] + "..." tool_info = { "name": tool_name, "description": description, "type": tool.get("type", "Unknown"), "has_parameters": bool(tool.get("parameter")), } tools_info.append(tool_info) if group_by_category: # Group by category tools_by_category = {} for tool_info in tools_info: tool_name = tool_info["name"] tool = self.tooluniverse.all_tool_dict.get(tool_name) if tool: category = _get_tool_category( tool, tool_name, self.tooluniverse ) if category not in tools_by_category: tools_by_category[category] = [] tools_by_category[category].append(tool_info) # Apply pagination to each category if needed if limit or offset > 0: paginated_by_category = {} for cat, infos in tools_by_category.items(): if offset > 0 or limit: paginated_by_category[cat] = ( infos[offset : offset + limit] if limit else infos[offset:] ) else: paginated_by_category[cat] = infos tools_by_category = paginated_by_category total_count = sum( len(infos) for infos in tools_by_category.values() ) return { "tools_by_category": tools_by_category, "total_tools": total_count, "limit": limit, "offset": offset, "has_more": False, # Pagination per category is complex, set to False for now } else: # Apply pagination total_count = len(tools_info) if offset > 0 or limit: tools_info = ( tools_info[offset : offset + limit] if limit else tools_info[offset:] ) return { "total_tools": total_count, "limit": limit, "offset": offset, "has_more": (offset + len(tools_info)) < total_count if limit else False, "tools": tools_info, } elif mode == "custom": # Return user-specified fields fields = arguments.get("fields", []) if not fields: return {"error": ("fields parameter is required for mode='custom'")} tools_info = [] for tool_name, tool in tools: if tool_name: tool_info = {} for field in fields: if field == "category": # Special handling for category field tool_info[field] = _get_tool_category( tool, tool_name, self.tooluniverse ) elif field in tool: tool_info[field] = tool[field] tools_info.append(tool_info) # Apply pagination total_count = len(tools_info) if offset > 0 or limit: tools_info = ( tools_info[offset : offset + limit] if limit else tools_info[offset:] ) return { "total_tools": total_count, "limit": limit, "offset": offset, "has_more": (offset + len(tools_info)) < total_count if limit else False, "tools": tools_info, } except Exception as e: error_msg = f"Error listing tools: {str(e)}" self.logger.error(error_msg, exc_info=True) return {"error": error_msg, "error_type": type(e).__name__}
[docs] @register_tool("GetToolInfo") class GetToolInfoTool(BaseTool): """Get tool information with configurable detail level. Supports single or batch tool queries."""
[docs] def __init__(self, tool_config, tooluniverse=None): super().__init__(tool_config) self.tooluniverse = tooluniverse
[docs] def run(self, arguments): """ Get tool information with configurable detail level. Args: arguments (dict): Dictionary containing: - tool_names (str | list): Single tool name (string) or list of tool names - detail_level (str, optional): "description" or "full". Default: "full" - "description": Returns only the description field (complete, not truncated) - "full": Returns complete tool definition including parameter schema Returns: dict: Dictionary with tool information - Single tool: Direct tool info object - Batch tools: {"tools": [...], "total_requested": N, "total_found": M} """ import time start_time = time.time() if not self.tooluniverse: return {"error": "ToolUniverse not available"} tool_names = arguments.get("tool_names") if not tool_names: return {"error": "tool_names parameter is required"} detail_level = arguments.get("detail_level", "full") if detail_level not in ["description", "full"]: return { "error": ( f"Invalid detail_level: {detail_level}. " "Must be 'description' or 'full'" ) } # Normalize tool_names to list if isinstance(tool_names, str): tool_names = [tool_names] is_single = True elif isinstance(tool_names, list): is_single = False else: return {"error": "tool_names must be a string or list"} # Limit to 20 tools to prevent context overflow MAX_TOOLS = 20 if len(tool_names) > MAX_TOOLS: return { "error": (f"Maximum {MAX_TOOLS} tools allowed, got {len(tool_names)}") } try: if detail_level == "description": # Return only description for each tool results = [] for tool_name in tool_names: tool_config = self.tooluniverse.all_tool_dict.get(tool_name) if not tool_config: results.append({"name": tool_name, "error": "not found"}) else: results.append( { "name": tool_name, "description": tool_config.get("description", ""), } ) # Return single tool directly, or batch format if is_single: return results[0] else: found_count = sum(1 for r in results if "error" not in r) return { "total_requested": len(tool_names), "total_found": found_count, "tools": results, } else: # detail_level == "full" # Use existing methods for full definitions if is_single: # Single tool: use tool_specification tool_config = self.tooluniverse.tool_specification( tool_names[0], return_prompt=False ) if not tool_config: return {"error": f"Tool '{tool_names[0]}' not found"} return tool_config else: # Batch: use get_tool_specification_by_names tools_definitions = ( self.tooluniverse.get_tool_specification_by_names(tool_names) ) # Handle tools not found found_names = { tool.get("name") for tool in tools_definitions if tool } missing_tools = [ {"name": name, "error": "not found"} for name in tool_names if name not in found_names ] # Combine found and missing tools all_tools = tools_definitions + missing_tools return { "total_requested": len(tool_names), "total_found": len(tools_definitions), "tools": all_tools, } except Exception as e: elapsed = time.time() - start_time error_msg = f"Error getting tool info: {str(e)}" self.logger.error(f"{error_msg} (elapsed: {elapsed:.2f}s)", exc_info=True) return { "error": error_msg, "error_type": type(e).__name__, "elapsed_seconds": round(elapsed, 2), } except KeyboardInterrupt: # Handle interruption gracefully elapsed = time.time() - start_time error_msg = "Tool info retrieval was interrupted" self.logger.warning(f"{error_msg} (elapsed: {elapsed:.2f}s)") return { "error": error_msg, "error_type": "InterruptedError", "elapsed_seconds": round(elapsed, 2), }
[docs] @register_tool("ExecuteTool") class ExecuteToolTool(BaseTool): """Execute a ToolUniverse tool directly with custom arguments."""
[docs] def __init__(self, tool_config, tooluniverse=None): super().__init__(tool_config) self.tooluniverse = tooluniverse
[docs] def run(self, arguments): """ Execute a ToolUniverse tool directly with custom arguments. Args: arguments (dict): Dictionary containing: - tool_name (str): Name of the tool to execute - arguments (dict): Dictionary of arguments to pass to the tool. IMPORTANT: This must be a JSON object (dict), NOT a JSON string. Example: {"param1": "value1", "param2": 5} Do NOT use: "param1=value1" or '{"param1":"value1"}' Returns: dict or str: Tool execution result. If result is already a dict, return as-is. If it's a string (JSON), parse and return as dict. """ if not self.tooluniverse: return {"error": "ToolUniverse not available"} tool_name = arguments.get("tool_name") tool_arguments = arguments.get("arguments") # Validate tool_name if not tool_name or (isinstance(tool_name, str) and not tool_name.strip()): error_msg = "tool_name parameter is required and cannot be empty" self.logger.error(f"execute_tool: {error_msg}") return {"error": error_msg, "error_type": "ValidationError"} # Normalize arguments if tool_arguments is None: parsed_args = {} elif isinstance(tool_arguments, dict): parsed_args = tool_arguments else: # Provide helpful error message with examples received_type = type(tool_arguments).__name__ error_msg = ( f"arguments must be a JSON object (dictionary), not a {received_type}. " f"Received: {repr(tool_arguments)[:100]}. " f'Example of correct format: {{"param1": "value1", "param2": 5}}. ' f"Do NOT use string format like 'param1=value1' or JSON string format." ) self.logger.error(f"{tool_name}: {error_msg}") return {"error": error_msg, "error_type": "ValidationError"} # Directly use tooluniverse.run_one_function - it handles everything function_call = {"name": tool_name, "arguments": parsed_args} result = self.tooluniverse.run_one_function(function_call) # Convert result to dict if it's a JSON string if isinstance(result, str): try: result = json.loads(result) except (json.JSONDecodeError, ValueError): # If it's not valid JSON, return as string wrapped in dict return {"result": result} # Return as dict (FastMCP will serialize if needed) return result if isinstance(result, dict) else {"result": result}