Source code for tooluniverse.execute_function

"""
ToolUniverse Function Execution Module

This module provides the core ToolUniverse class for managing and executing various scientific and data tools.
It supports loading tools from JSON configurations, organizing them by categories, validating function calls,
and executing tools with proper error handling and caching.

The module includes support for:
- GraphQL tools (OpenTarget, OpenTarget Genetics)
- RESTful API tools (Monarch, ChEMBL, PubChem, etc.)
- FDA drug labeling and adverse event tools
- Clinical trials tools
- Literature search tools (EuropePMC, Semantic Scholar, PubTator)
- Biological databases (HPA, Reactome, UniProt)
- MCP (Model Context Protocol) clients and auto-loaders
- Enrichment analysis tools
- Package management tools

Classes:
    ToolUniverse: Main class for tool management and execution

Constants:
    default_tool_files: Default mapping of tool categories to JSON file paths
    tool_type_mappings: Mapping of tool type strings to their implementation classes
"""

import copy
import json
import random
import string
import os
import time
from .utils import read_json_list, evaluate_function_call, extract_function_call_json
from .tool_registry import (
    auto_discover_tools,
    get_tool_registry,
    register_external_tool,
    get_tool_class_lazy,
)
from .logging_config import (
    get_logger,
    debug,
    info,
    warning,
    error,
    set_log_level,
)
from .output_hook import HookManager
from .default_config import default_tool_files, get_default_hook_config

# Determine the directory where the current file is located
current_dir = os.path.dirname(os.path.abspath(__file__))

# Check if lazy loading is enabled (default: True for better performance)
LAZY_LOADING_ENABLED = os.getenv("TOOLUNIVERSE_LAZY_LOADING", "true").lower() in (
    "true",
    "1",
    "yes",
)

if LAZY_LOADING_ENABLED:
    # Use lazy auto-discovery by default (much faster)
    debug("Starting lazy tool auto-discovery...")
    tool_type_mappings = auto_discover_tools(lazy=True)
else:
    # Use full auto-discovery (slower but ensures all tools are immediately available)
    debug("Starting full tool auto-discovery...")
    tool_type_mappings = auto_discover_tools(lazy=False)

# Update the registry with any manually added tools
tool_type_mappings = get_tool_registry()

if LAZY_LOADING_ENABLED:
    debug(
        f"Lazy tool registry initialized with {len(tool_type_mappings)} immediately available tools"
    )
else:
    debug(f"Full tool registry initialized with {len(tool_type_mappings)} tools")
for _tool_name, _tool_class in sorted(tool_type_mappings.items()):
    debug(f"  - {_tool_name}: {_tool_class.__name__}")


[docs] class ToolUniverse: """ A comprehensive tool management system for loading, organizing, and executing various scientific and data tools. The ToolUniverse class provides a centralized interface for managing different types of tools including GraphQL tools, RESTful APIs, MCP clients, and specialized scientific tools. It handles tool loading, filtering, caching, and execution. Attributes: all_tools (list): List of all loaded tool configurations all_tool_dict (dict): Dictionary mapping tool names to their configurations tool_category_dicts (dict): Dictionary organizing tools by category tool_files (dict): Dictionary mapping category names to their JSON file paths callable_functions (dict): Cache of instantiated tool objects """
[docs] def __init__( self, tool_files=default_tool_files, keep_default_tools=True, log_level: str = None, hooks_enabled: bool = False, hook_config: dict = None, hook_type: str = None, ): """ Initialize the ToolUniverse with tool file configurations. Args: tool_files (dict, optional): Dictionary mapping category names to JSON file paths. Defaults to default_tool_files. keep_default_tools (bool, optional): Whether to keep default tools when custom tool_files are provided. Defaults to True. log_level (str, optional): Log level for this instance. Can be 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'. If None, uses global setting. hooks_enabled (bool, optional): Whether to enable output hooks. Defaults to False. hook_config (dict, optional): Configuration for hooks. If None, uses default config. hook_type (str or list, optional): Simple hook type selection. Can be 'SummarizationHook', 'FileSaveHook', or a list of both. Defaults to 'SummarizationHook'. If both hook_config and hook_type are provided, hook_config takes precedence. """ # Set log level if specified if log_level is not None: set_log_level(log_level) # Get logger for this class self.logger = get_logger("ToolUniverse") # Initialize any necessary attributes here FIRST self.all_tools = [] self.all_tool_dict = {} self.tool_category_dicts = {} self.tool_finder = None if tool_files is None: tool_files = default_tool_files elif keep_default_tools: default_tool_files.update(tool_files) tool_files = default_tool_files self.tool_files = tool_files self.logger.debug("Tool files:") self.logger.debug(json.dumps(tool_files, indent=2)) self.callable_functions = {} # Refresh the global tool_type_mappings to include any tools registered during imports global tool_type_mappings tool_type_mappings = get_tool_registry() # Initialize hook system AFTER attributes are initialized self.hooks_enabled = hooks_enabled if self.hooks_enabled: # Determine hook configuration if hook_config is not None: # Use provided hook_config (takes precedence) final_hook_config = hook_config self.logger.info("Using provided hook_config") elif hook_type is not None: # Use hook_type to generate simple configuration final_hook_config = self._create_hook_config_from_type(hook_type) self.logger.info(f"Using hook_type: {hook_type}") else: # Use default configuration with SummarizationHook final_hook_config = get_default_hook_config() self.logger.info("Using default hook configuration (SummarizationHook)") self.hook_manager = HookManager(final_hook_config, self) self.logger.info("Output hooks enabled") else: self.hook_manager = None self.logger.debug("Output hooks disabled")
[docs] def register_custom_tool(self, tool_class, tool_name=None, tool_config=None): """ Register a custom tool class at runtime. Args: tool_class: The tool class to register tool_name (str, optional): Name to register under. Uses class name if None. tool_config (dict, optional): Tool configuration dictionary to add to all_tools Returns: str: The name the tool was registered under """ name = tool_name or tool_class.__name__ # Register the tool class register_external_tool(name, tool_class) # Update the global tool_type_mappings global tool_type_mappings tool_type_mappings = get_tool_registry() # If tool_config is provided, add it to all_tools if tool_config: # Ensure the config has the correct type if "type" not in tool_config: tool_config["type"] = name self.all_tools.append(tool_config) if "name" in tool_config: self.all_tool_dict[tool_config["name"]] = tool_config self.logger.info(f"Custom tool '{name}' registered successfully!") return name
[docs] def force_full_discovery(self): """ Force full tool discovery, importing all tool modules immediately. This can be useful when you need to ensure all tools are available immediately, bypassing lazy loading. Returns: dict: Updated tool registry with all discovered tools """ global tool_type_mappings self.logger.info("Forcing full tool discovery...") tool_type_mappings = auto_discover_tools(lazy=False) self.logger.info( f"Full discovery complete. {len(tool_type_mappings)} tools available." ) return tool_type_mappings
[docs] def get_lazy_loading_status(self): """ Get information about lazy loading status and available tools. Returns: dict: Dictionary with lazy loading status and tool counts """ from .tool_registry import _discovery_completed, _lazy_registry return { "lazy_loading_enabled": LAZY_LOADING_ENABLED, "full_discovery_completed": _discovery_completed, "immediately_available_tools": len(tool_type_mappings), "lazy_mappings_available": len(_lazy_registry), "loaded_tools_count": ( len(self.all_tools) if hasattr(self, "all_tools") else 0 ), }
[docs] def get_tool_types(self): """ Get the types of tools available in the tool files. Returns: list: A list of tool type names (category keys). """ return list(self.tool_files.keys())
def _get_api_key(self, key_name: str): """Get API key from environment variables or loaded sources""" # First check environment variables (highest priority) env_value = os.getenv(key_name) if env_value: return env_value else: return None def _check_api_key_requirements(self, tool_config): """ Check if a tool's required API keys are available. Also supports optional_api_keys where at least one key from the list must be available. Args: tool_config (dict): Tool configuration containing optional 'required_api_keys' and 'optional_api_keys' fields Returns: tuple: (bool, list) - (all_keys_available, missing_keys) """ required_keys = tool_config.get("required_api_keys", []) optional_keys = tool_config.get("optional_api_keys", []) missing_keys = [] # Check required keys (all must be available) for key in required_keys: if not self._get_api_key(key): missing_keys.append(key) # Check optional keys (at least one must be available) optional_satisfied = True if optional_keys: optional_available = any(self._get_api_key(key) for key in optional_keys) if not optional_available: optional_satisfied = False # For error reporting, add a descriptive message about optional keys missing_keys.append(f"At least one of: {', '.join(optional_keys)}") # Tool is valid if all required keys are available AND optional requirement is satisfied all_valid = ( len([k for k in missing_keys if not k.startswith("At least one of:")]) == 0 and optional_satisfied ) return all_valid, missing_keys
[docs] def generate_env_template( self, all_missing_keys, output_file: str = ".env.template" ): """Generate a template .env file with all required API keys""" with open(output_file, "w") as f: f.write("# API Keys for ToolUniverse\n") f.write("# Copy this file to .env and fill in your actual API keys\n\n") for key in sorted(all_missing_keys): f.write(f"{key}=your_api_key_here\n\n") self.logger.info(f"Generated API key template: {output_file}") self.logger.info("Copy this file to .env and fill in your API keys")
def _create_hook_config_from_type(self, hook_type): """ Create hook configuration from simple hook_type parameter. Args: hook_type (str or list): Hook type(s) to enable. Can be 'SummarizationHook', 'FileSaveHook', or a list of both. Returns: dict: Generated hook configuration """ # Handle single hook type if isinstance(hook_type, str): hook_types = [hook_type] else: hook_types = hook_type # Validate hook types valid_types = ["SummarizationHook", "FileSaveHook"] for htype in hook_types: if htype not in valid_types: raise ValueError( f"Invalid hook_type: {htype}. Valid types are: {valid_types}" ) # Create hooks list hooks = [] for htype in hook_types: if htype == "SummarizationHook": hooks.append( { "name": "summarization_hook", "type": "SummarizationHook", "enabled": True, "conditions": { "output_length": {"operator": ">", "threshold": 5000} }, "hook_config": { "chunk_size": 32000, "focus_areas": "key_findings_and_results", "max_summary_length": 3000, }, } ) elif htype == "FileSaveHook": hooks.append( { "name": "file_save_hook", "type": "FileSaveHook", "enabled": True, "conditions": { "output_length": {"operator": ">", "threshold": 1000} }, "hook_config": { "temp_dir": None, "file_prefix": "tool_output", "include_metadata": True, "auto_cleanup": False, "cleanup_age_hours": 24, }, } ) return {"hooks": hooks}
[docs] def load_tools( self, tool_type=None, exclude_tools=None, exclude_categories=None, include_tools=None, tool_config_files=None, tools_file=None, include_tool_types=None, exclude_tool_types=None, ): """ Loads tool definitions from JSON files into the instance's tool registry. If `tool_type` is None, loads all available tool categories from `self.tool_files`. Otherwise, loads only the specified tool categories. After loading, deduplicates tools by their 'name' field and updates the internal tool list. Also refreshes the tool name and description mapping. Args: tool_type (list, optional): List of tool category names to load. If None, loads all categories. exclude_tools (list, optional): List of specific tool names to exclude from loading. exclude_categories (list, optional): List of tool categories to exclude from loading. include_tools (list or str, optional): List of specific tool names to include, or path to a text file containing tool names (one per line). If provided, only these tools will be loaded regardless of categories. tool_config_files (dict, optional): Additional tool configuration files to load. Format: {"category_name": "/path/to/config.json"} tools_file (str, optional): Path to a text file containing tool names to include (one per line). Alternative to include_tools when providing a file path. include_tool_types (list, optional): List of tool types to include (e.g., ["OpenTarget", "ChEMBLTool"]). If provided, only tools with these types will be loaded. exclude_tool_types (list, optional): List of tool types to exclude (e.g., ["ToolFinderEmbedding"]). Tools with these types will be excluded. Side Effects: - Updates `self.all_tools` with loaded and deduplicated tools. - Updates `self.tool_category_dicts` with loaded tools per category. - Calls `self.refresh_tool_name_desc()` to update tool name/description mapping. - Prints the number of tools before and after loading. Examples: # Load specific tools by name tu.load_tools(include_tools=["UniProt_get_entry_by_accession", "ChEMBL_get_molecule_by_chembl_id"]) # Load tools from a file tu.load_tools(tools_file="/path/to/tool_names.txt") # Include only specific tool types tu.load_tools(include_tool_types=["OpenTarget", "ChEMBLTool"]) # Exclude specific tool types tu.load_tools(exclude_tool_types=["ToolFinderEmbedding", "Unknown"]) # Load additional config files tu.load_tools(tool_config_files={"custom_tools": "/path/to/custom_tools.json"}) # Combine multiple options tu.load_tools( tool_type=["uniprot", "ChEMBL"], exclude_tools=["problematic_tool"], exclude_tool_types=["Unknown"], tool_config_files={"custom": "/path/to/custom.json"} ) """ self.logger.debug(f"Number of tools before load tools: {len(self.all_tools)}") # Handle tools_file parameter (alternative to include_tools) if tools_file: include_tools = self._load_tool_names_from_file(tools_file) # Handle include_tools parameter if isinstance(include_tools, str): # If include_tools is a string, treat it as a file path include_tools = self._load_tool_names_from_file(include_tools) # Convert parameters to sets for efficient lookup exclude_tools_set = set(exclude_tools or []) exclude_categories_set = set(exclude_categories or []) include_tools_set = set(include_tools or []) if include_tools else None include_tool_types_set = ( set(include_tool_types or []) if include_tool_types else None ) exclude_tool_types_set = ( set(exclude_tool_types or []) if exclude_tool_types else None ) # Log operations if exclude_tools_set: self.logger.info( f"Excluding tools by name: {', '.join(list(exclude_tools_set)[:5])}{'...' if len(exclude_tools_set) > 5 else ''}" ) if exclude_categories_set: self.logger.info( f"Excluding categories: {', '.join(exclude_categories_set)}" ) if include_tools_set: self.logger.info( f"Including only specific tools: {len(include_tools_set)} tools specified" ) if include_tool_types_set: self.logger.info( f"Including only tool types: {', '.join(include_tool_types_set)}" ) if exclude_tool_types_set: self.logger.info( f"Excluding tool types: {', '.join(exclude_tool_types_set)}" ) if tool_config_files: self.logger.info( f"Loading additional config files: {', '.join(tool_config_files.keys())}" ) # Merge additional config files with existing tool_files all_tool_files = self.tool_files.copy() if tool_config_files: # Validate that additional config files exist for category, file_path in tool_config_files.items(): if os.path.exists(file_path): all_tool_files[category] = file_path self.logger.debug( f"Added config file for category '{category}': {file_path}" ) else: self.logger.warning( f"Config file for category '{category}' not found: {file_path}" ) # Determine which categories to process if tool_type is None: categories_to_load = [ cat for cat in all_tool_files.keys() if cat not in exclude_categories_set ] else: assert isinstance( tool_type, list ), "tool_type must be a list of tool category names" categories_to_load = [ cat for cat in tool_type if cat not in exclude_categories_set ] # Load tools from specified categories for each in categories_to_load: if each in all_tool_files: try: loaded_data = read_json_list(all_tool_files[each]) # Handle different data formats if isinstance(loaded_data, dict): # Convert dict of tools to list of tools loaded_tool_list = list(loaded_data.values()) self.logger.debug( f"Converted dict to list: {len(loaded_tool_list)} tools" ) elif isinstance(loaded_data, list): loaded_tool_list = loaded_data else: self.logger.warning( f"Unexpected data format from {all_tool_files[each]}: {type(loaded_data)}" ) continue self.all_tools += loaded_tool_list self.tool_category_dicts[each] = loaded_tool_list self.logger.debug( f"Loaded {len(loaded_tool_list)} tools from category '{each}'" ) except Exception as e: self.logger.error( f"Error loading tools from category '{each}': {e}" ) else: self.logger.warning( f"Tool category '{each}' not found in available tool files" ) # Load auto-discovered configs from decorators self._load_auto_discovered_configs() # Filter and deduplicate tools self._filter_and_deduplicate_tools( exclude_tools_set, include_tools_set, include_tool_types_set, exclude_tool_types_set, ) # Process MCP Auto Loader tools self.logger.debug("Checking for MCP Auto Loader tools...") self._process_mcp_auto_loaders()
def _load_tool_names_from_file(self, file_path): """ Load tool names from a text file (one tool name per line). Args: file_path (str): Path to the text file containing tool names Returns: list: List of tool names loaded from the file """ try: if not os.path.exists(file_path): self.logger.error(f"Tools file not found: {file_path}") return [] with open(file_path, "r", encoding="utf-8") as f: tool_names = [] for _line_num, line in enumerate(f, 1): line = line.strip() if line and not line.startswith( "#" ): # Skip empty lines and comments tool_names.append(line) self.logger.info( f"Loaded {len(tool_names)} tool names from file: {file_path}" ) return tool_names except Exception as e: self.logger.error(f"Error loading tool names from file {file_path}: {e}") return [] def _filter_and_deduplicate_tools( self, exclude_tools_set, include_tools_set, include_tool_types_set=None, exclude_tool_types_set=None, ): """ Filter tools based on inclusion/exclusion criteria and remove duplicates. Args: exclude_tools_set (set): Set of tool names to exclude include_tools_set (set or None): Set of tool names to include (if None, include all) include_tool_types_set (set or None): Set of tool types to include (if None, include all) exclude_tool_types_set (set or None): Set of tool types to exclude (if None, exclude none) """ tool_name_list = [] dedup_all_tools = [] all_missing_keys = set() duplicate_names = set() excluded_tools_count = 0 included_tools_count = 0 missing_included_tools = set() # If include_tools_set is specified, track which tools we found if include_tools_set: missing_included_tools = include_tools_set.copy() for each in self.all_tools: # Handle both dict and string entries if isinstance(each, dict): tool_name = each.get("name", "") tool_type = each.get("type", "Unknown") elif isinstance(each, str): self.logger.warning(f"Found string in all_tools: {each}") continue else: self.logger.warning(f"Unknown type in all_tools: {type(each)} - {each}") continue # Check tool type inclusion/exclusion if include_tool_types_set and tool_type not in include_tool_types_set: self.logger.debug( f"Excluding tool '{tool_name}' - type '{tool_type}' not in include list" ) continue if exclude_tool_types_set and tool_type in exclude_tool_types_set: self.logger.debug( f"Excluding tool '{tool_name}' - type '{tool_type}' is excluded" ) continue # If include_tools_set is specified, only include tools in that set if include_tools_set: if tool_name not in include_tools_set: continue else: missing_included_tools.discard(tool_name) included_tools_count += 1 # Skip excluded tools if tool_name in exclude_tools_set: excluded_tools_count += 1 self.logger.debug(f"Excluding tool by name: {tool_name}") continue # Check API key requirements if "required_api_keys" in each: all_keys_available, missing_keys = self._check_api_key_requirements( each ) if not all_keys_available: all_missing_keys.update(missing_keys) self.logger.debug( f"Skipping tool '{tool_name}' due to missing API keys: {', '.join(missing_keys)}" ) continue # Handle duplicates if tool_name not in tool_name_list: tool_name_list.append(tool_name) dedup_all_tools.append(each) else: duplicate_names.add(tool_name) # Report statistics if duplicate_names: self.logger.debug( f"Duplicate tool names found and dropped: {', '.join(list(duplicate_names)[:5])}{'...' if len(duplicate_names) > 5 else ''}" ) if excluded_tools_count > 0: self.logger.info(f"Excluded {excluded_tools_count} tools by name") if include_tools_set: self.logger.info(f"Included {included_tools_count} tools by name filter") if missing_included_tools: self.logger.warning( f"Could not find {len(missing_included_tools)} requested tools: {', '.join(list(missing_included_tools)[:5])}{'...' if len(missing_included_tools) > 5 else ''}" ) self.all_tools = dedup_all_tools self.refresh_tool_name_desc() info(f"Number of tools after load tools: {len(self.all_tools)}") # Generate template for missing API keys if len(all_missing_keys) > 0: warning(f"\nMissing API keys: {', '.join(all_missing_keys)}") info("Generating .env.template file with missing API keys...") self.generate_env_template(all_missing_keys) def _load_auto_discovered_configs(self): """ Load auto-discovered configs from the decorator registry. This method loads tool configurations that were registered automatically via the @register_tool decorator with config parameter. """ from .tool_registry import get_config_registry discovered_configs = get_config_registry() if discovered_configs: self.logger.debug( f"Loading {len(discovered_configs)} auto-discovered tool configs" ) for _tool_type, config in discovered_configs.items(): # Add to all_tools if not already present if "name" in config and config["name"] not in [ tool.get("name") for tool in self.all_tools ]: self.all_tools.append(config) self.logger.debug(f"Added auto-discovered config: {config['name']}") def _process_mcp_auto_loaders(self): """ Process any MCPAutoLoaderTool instances to automatically discover and register MCP tools. This method scans through all loaded tools for MCPAutoLoaderTool instances and runs their auto-discovery process to find and register MCP tools from configured servers. It handles async operations properly with cleanup and error handling. Side Effects: - May add new tools to the tool registry - Prints debug information about the discovery process - Updates tool counts after MCP registration """ self.logger.debug("Starting _process_mcp_auto_loaders") import asyncio import warnings auto_loaders = [] self.logger.debug(f"Checking {len(self.all_tools)} tools for MCPAutoLoaderTool") for tool_config in self.all_tools: if tool_config.get("type") == "MCPAutoLoaderTool": auto_loaders.append(tool_config) self.logger.debug(f"Found MCPAutoLoaderTool: {tool_config['name']}") if not auto_loaders: self.logger.debug("No MCP Auto Loader tools found") return info(f"Found {len(auto_loaders)} MCP Auto Loader tool(s), processing...") # Check if we're already in an event loop try: asyncio.get_running_loop() in_event_loop = True self.logger.debug("Already in an event loop, using async approach") except RuntimeError: in_event_loop = False self.logger.debug("No event loop running, will create new one") # Process each auto loader for loader_config in auto_loaders: self.logger.debug(f"Processing loader: {loader_config['name']}") try: # Create auto loader instance self.logger.debug("Creating auto loader instance...") auto_loader = tool_type_mappings["MCPAutoLoaderTool"](loader_config) self.logger.debug("Auto loader instance created") # Run auto-load process with proper session cleanup self.logger.debug("Starting auto-load process...") async def _run_auto_load(loader): """Run auto-load with proper cleanup""" try: result = await loader.auto_load_and_register(self) return result finally: # Ensure session cleanup await loader._close_session() if in_event_loop: # We're already in an event loop, so we can't use run_until_complete # Instead, we'll skip MCP auto-loading for now and warn the user warning( f"Warning: Cannot process MCP Auto Loader '{loader_config['name']}' because we're already in an event loop." ) self.logger.debug( "This is a known limitation when SMCP is used within an async context." ) self.logger.debug( "MCP tools will need to be loaded manually or the server should be run outside of an async context." ) continue else: # No event loop, safe to create one # Suppress ResourceWarnings during cleanup with warnings.catch_warnings(): warnings.simplefilter("ignore", ResourceWarning) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: self.logger.debug("Running auto_load_and_register...") result = loop.run_until_complete( _run_auto_load(auto_loader) ) self.logger.debug( f"Auto-load completed with result: {result}" ) info( f"MCP Auto Loader '{loader_config['name']}' processed:" ) info( f" - Discovered: {result.get('discovered_count', 0)} tools" ) info( f" - Registered: {result.get('registered_count', 0)} tools" ) # Print detailed tool information if result.get("tools"): print( f" 📋 Discovered MCP tools: {', '.join(result['tools'])}" ) if result.get("registered_tools"): print( f" 🔧 Registered tools in ToolUniverse: {', '.join(result['registered_tools'])}" ) self.logger.debug( f" - Tools: {', '.join(result['registered_tools'])}" ) # Show available tools in callable_functions expert_tools = [ name for name in self.callable_functions.keys() if name.startswith("expert_") ] if expert_tools: print( f" ✅ Expert tools now available: {', '.join(expert_tools)}" ) else: print( " ⚠️ No expert tools found in callable_functions after registration" ) finally: self.logger.debug("Closing async loop...") # Clean up any remaining tasks try: pending = asyncio.all_tasks(loop) for task in pending: task.cancel() if pending: loop.run_until_complete( asyncio.gather(*pending, return_exceptions=True) ) except Exception: pass # Ignore cleanup errors finally: loop.close() self.logger.debug("Async loop closed") except Exception as e: self.logger.debug(f"Exception in auto loader processing: {e}") import traceback traceback.print_exc() self.logger.debug( f"Failed to process MCP Auto Loader '{loader_config['name']}': {str(e)}" ) # Update tool count after MCP registration self.logger.debug( f"Number of tools after MCP auto-loading: {len(self.all_tool_dict)}" ) self.logger.debug("_process_mcp_auto_loaders completed")
[docs] def select_tools( self, include_names=None, exclude_names=None, include_categories=None, exclude_categories=None, ): """ Select tools based on tool names and/or categories (tool_files keys). Args: include_names (list, optional): List of tool names to include. If None, include all. exclude_names (list, optional): List of tool names to exclude. include_categories (list, optional): List of categories (tool_files keys) to include. If None, include all. exclude_categories (list, optional): List of categories (tool_files keys) to exclude. Returns: list: List of selected tool configurations. """ selected_tools = [] # If categories are specified, use self.tool_category_dicts to filter categories = set(self.tool_category_dicts.keys()) if include_categories is not None: categories &= set(include_categories) if exclude_categories is not None: categories -= set(exclude_categories) # Gather tools from selected categories for cat in categories: selected_tools.extend(self.tool_category_dicts[cat]) # Further filter by names if needed if include_names is not None: selected_tools = [ tool for tool in selected_tools if tool["name"] in include_names ] if exclude_names is not None: selected_tools = [ tool for tool in selected_tools if tool["name"] not in exclude_names ] return selected_tools
[docs] def filter_tool_lists( self, tool_name_list, tool_desc_list, include_names=None, exclude_names=None, include_categories=None, exclude_categories=None, ): """ Directly filter tool name and description lists based on names and/or categories. This method takes existing tool name and description lists and filters them according to the specified criteria using the select_tools method for category-based filtering. Args: tool_name_list (list): List of tool names to filter. tool_desc_list (list): List of tool descriptions to filter (must correspond to tool_name_list). include_names (list, optional): List of tool names to include. exclude_names (list, optional): List of tool names to exclude. include_categories (list, optional): List of categories to include. exclude_categories (list, optional): List of categories to exclude. Returns: tuple: A tuple containing (filtered_tool_name_list, filtered_tool_desc_list). """ # Build a set of allowed tool names using select_tools for category filtering allowed_names = set() if any([include_names, exclude_names, include_categories, exclude_categories]): filtered_tools = self.select_tools( include_names=include_names, exclude_names=exclude_names, include_categories=include_categories, exclude_categories=exclude_categories, ) allowed_names = set(tool["name"] for tool in filtered_tools) else: allowed_names = set(tool_name_list) # Filter lists by allowed_names filtered_tool_name_list = [] filtered_tool_desc_list = [] for name, desc in zip(tool_name_list, tool_desc_list): if name in allowed_names: filtered_tool_name_list.append(name) filtered_tool_desc_list.append(desc) return filtered_tool_name_list, filtered_tool_desc_list
[docs] def return_all_loaded_tools(self): """ Return a deep copy of all loaded tools. Returns: list: A deep copy of the all_tools list to prevent external modification. """ return copy.deepcopy(self.all_tools)
[docs] def list_built_in_tools(self, mode="config", scan_all=False): """ List all built-in tool categories and their statistics with different modes. This method provides a comprehensive overview of all available tools in the ToolUniverse, organized by categories. It reads directly from the default tool files to gather statistics, so it works even before calling load_tools(). Args: mode (str, optional): Organization mode for tools. Defaults to 'config'. - 'config': Organize by config file categories (original behavior) - 'type': Organize by tool types (implementation classes) - 'list_name': Return a list of all tool names - 'list_spec': Return a list of all tool specifications scan_all (bool, optional): Whether to scan all JSON files in data directory recursively. If True, scans all JSON files in data/ and its subdirectories. If False (default), uses predefined tool file mappings. Returns: dict or list: - For 'config' and 'type' modes: A dictionary containing tool statistics - For 'list_name' mode: A list of all tool names - For 'list_spec' mode: A list of all tool specifications Example: >>> tool_universe = ToolUniverse() >>> # Group by config file categories (predefined files only) >>> stats = tool_universe.list_built_in_tools(mode='config') >>> # Scan all JSON files in data directory recursively >>> stats = tool_universe.list_built_in_tools(mode='config', scan_all=True) >>> # Get all tool names from all JSON files >>> tool_names = tool_universe.list_built_in_tools(mode='list_name', scan_all=True) Note: - This method reads directly from tool files and works without calling load_tools() - Tools are deduplicated across categories, so the same tool won't be counted multiple times - The summary is automatically printed to console when this method is called (except for list_name and list_spec modes) - When scan_all=True, all JSON files in data/ and subdirectories are scanned """ if mode not in ["config", "type", "list_name", "list_spec"]: raise ValueError( "Mode must be one of: 'config', 'type', 'list_name', 'list_spec'" ) # For list_name and list_spec modes, we can return early with just the data if mode in ["list_name", "list_spec"]: all_tools = [] all_tool_names = set() # For deduplication across categories if scan_all: # Scan all JSON files in data directory recursively all_tools, all_tool_names = self._scan_all_json_files() else: # Use predefined tool files (original behavior) all_tools, all_tool_names = self._scan_predefined_files() # Deduplicate tools by name unique_tools = {} for tool in all_tools: if tool["name"] not in unique_tools: unique_tools[tool["name"]] = tool if mode == "list_name": return sorted(list(unique_tools.keys())) elif mode == "list_spec": return list(unique_tools.values()) # Original logic for config and type modes result = { "categories": {}, "total_categories": 0, "total_tools": 0, "mode": mode, "summary": "", } if scan_all: # Scan all JSON files in data directory recursively all_tools, all_tool_names = self._scan_all_json_files() # For config mode with scan_all, organize by file names if mode == "config": file_tools_map = {} for tool in all_tools: # Get the source file for this tool (we need to track this) # For now, we'll organize by tool type as a fallback tool_type = tool.get("type", "Unknown") if tool_type not in file_tools_map: file_tools_map[tool_type] = [] file_tools_map[tool_type].append(tool) for category, tools in file_tools_map.items(): result["categories"][category] = {"count": len(tools)} else: # Use predefined tool files (original behavior) all_tools, all_tool_names = self._scan_predefined_files() # Read tools from each category file for category, file_path in self.tool_files.items(): try: # Read the JSON file for this category tools_in_category = read_json_list(file_path) if mode == "config": tool_names = [tool["name"] for tool in tools_in_category] result["categories"][category] = {"count": len(tool_names)} except Exception as e: warning( f"Warning: Could not read tools from {category} ({file_path}): {e}" ) if mode == "config": result["categories"][category] = {"count": 0} # If mode is 'type', organize by tool types instead if mode == "type": # Deduplicate tools by name first unique_tools = {} for tool in all_tools: if tool["name"] not in unique_tools: unique_tools[tool["name"]] = tool # Group by tool type type_groups = {} for tool in unique_tools.values(): tool_type = tool.get("type", "Unknown") if tool_type not in type_groups: type_groups[tool_type] = [] type_groups[tool_type].append(tool["name"]) # Build result for type mode for tool_type, tool_names in type_groups.items(): result["categories"][tool_type] = { "count": len(tool_names), "tools": sorted(tool_names), } # Calculate totals result["total_categories"] = len(result["categories"]) result["total_tools"] = len(all_tool_names) # Generate summary information mode_title = "Config File Categories" if mode == "config" else "Tool Types" summary_lines = [ "=" * 60, f"🔧 ToolUniverse Built-in Tools Overview ({mode_title})", "=" * 60, f"📊 Total Categories: {result['total_categories']}", f"🛠️ Total Unique Tools: {result['total_tools']}", f"📋 Organization Mode: {mode}", "", f"📂 {mode_title} Breakdown:", "-" * 40, ] # Sort categories by tool count (descending) for better visualization sorted_categories = sorted( result["categories"].items(), key=lambda x: x[1]["count"], reverse=True ) for category, category_info in sorted_categories: count = category_info["count"] # Add visual indicators for different tool counts if count >= 10: icon = "🟢" elif count >= 5: icon = "🟡" elif count >= 1: icon = "🟠" else: icon = "🔴" # Format category name to be more readable if mode == "config": display_name = category.replace("_", " ").title() else: display_name = category summary_lines.append(f" {icon} {display_name:<35} {count:>3} tools") # For type mode, optionally show some tool examples if ( mode == "type" and "tools" in category_info and len(category_info["tools"]) <= 5 ): for tool_name in category_info["tools"]: summary_lines.append(f" └─ {tool_name}") elif ( mode == "type" and "tools" in category_info and len(category_info["tools"]) > 5 ): for tool_name in category_info["tools"][:3]: summary_lines.append(f" └─ {tool_name}") summary_lines.append( f" └─ ... and {len(category_info['tools']) - 3} more" ) summary_lines.extend( ["-" * 40, "✅ Ready to use! Call load_tools() to initialize.", "=" * 60] ) result["summary"] = "\n".join(summary_lines) # Print summary to console directly print(result["summary"]) return result
def _scan_predefined_files(self): """ Scan predefined tool files (original behavior). Returns: tuple: (all_tools, all_tool_names) where all_tools is a list of tool configs and all_tool_names is a set of tool names for deduplication """ all_tools = [] all_tool_names = set() # Read tools from each category file for category, file_path in self.tool_files.items(): try: # Read the JSON file for this category tools_in_category = read_json_list(file_path) all_tools.extend(tools_in_category) all_tool_names.update([tool["name"] for tool in tools_in_category]) except Exception as e: warning( f"Warning: Could not read tools from {category} ({file_path}): {e}" ) # Also include remote tools try: remote_dir = os.path.join(current_dir, "data", "remote_tools") if os.path.isdir(remote_dir): remote_tools = [] for fname in os.listdir(remote_dir): if not fname.lower().endswith(".json"): continue fpath = os.path.join(remote_dir, fname) try: tools_in_file = read_json_list(fpath) if isinstance(tools_in_file, dict): tools_in_file = list(tools_in_file.values()) if isinstance(tools_in_file, list): remote_tools.extend(tools_in_file) except Exception as e: warning( f"Warning: Could not read remote tools from {fpath}: {e}" ) if remote_tools: all_tools.extend(remote_tools) all_tool_names.update([tool["name"] for tool in remote_tools]) except Exception as e: warning(f"Warning: Failed to scan remote tools directory: {e}") return all_tools, all_tool_names def _scan_all_json_files(self): """ Recursively scan all JSON files in the data directory and its subdirectories. Returns: tuple: (all_tools, all_tool_names) where all_tools is a list of tool configs and all_tool_names is a set of tool names for deduplication """ all_tools = [] all_tool_names = set() # Get the data directory path data_dir = os.path.join(current_dir, "data") if not os.path.exists(data_dir): warning(f"Warning: Data directory not found: {data_dir}") return all_tools, all_tool_names # Recursively find all JSON files json_files = [] for root, _dirs, files in os.walk(data_dir): for file in files: if file.lower().endswith(".json"): json_files.append(os.path.join(root, file)) self.logger.debug(f"Found {len(json_files)} JSON files to scan") # Read tools from each JSON file for json_file in json_files: try: tools_in_file = read_json_list(json_file) # Handle different data formats if isinstance(tools_in_file, dict): # Convert dict of tools to list of tools tools_in_file = list(tools_in_file.values()) elif not isinstance(tools_in_file, list): # Skip files that don't contain tool configurations continue # Add tools to our collection for tool in tools_in_file: if isinstance(tool, dict) and "name" in tool: all_tools.append(tool) all_tool_names.add(tool["name"]) self.logger.debug(f"Loaded {len(tools_in_file)} tools from {json_file}") except Exception as e: warning(f"Warning: Could not read tools from {json_file}: {e}") continue self.logger.info( f"Scanned {len(json_files)} JSON files, found {len(all_tools)} tools" ) return all_tools, all_tool_names
[docs] def refresh_tool_name_desc( self, enable_full_desc=False, include_names=None, exclude_names=None, include_categories=None, exclude_categories=None, ): """ Refresh the tool name and description mappings with optional filtering. This method rebuilds the internal tool dictionary and generates filtered lists of tool names and descriptions based on the provided filter criteria. Args: enable_full_desc (bool, optional): If True, includes full tool JSON as description. If False, uses "name: description" format. Defaults to False. include_names (list, optional): List of tool names to include. exclude_names (list, optional): List of tool names to exclude. include_categories (list, optional): List of categories to include. exclude_categories (list, optional): List of categories to exclude. Returns: tuple: A tuple containing (tool_name_list, tool_desc_list) after filtering. """ tool_name_list = [] tool_desc_list = [] for tool in self.all_tools: tool_name_list.append(tool["name"]) if enable_full_desc: tool_desc_list.append(json.dumps(tool)) else: tool_desc_list.append(tool["name"] + ": " + tool["description"]) self.all_tool_dict[tool["name"]] = tool # Apply filtering if any filter argument is provided if any([include_names, exclude_names, include_categories, exclude_categories]): tool_name_list, tool_desc_list = self.filter_tool_lists( tool_name_list, tool_desc_list, include_names=include_names, exclude_names=exclude_names, include_categories=include_categories, exclude_categories=exclude_categories, ) self.logger.debug( f"Number of tools after refresh and filter: {len(tool_name_list)}" ) return tool_name_list, tool_desc_list
[docs] def prepare_one_tool_prompt(self, tool): """ Prepare a single tool configuration for prompt usage by filtering to essential keys. Args: tool (dict): Tool configuration dictionary. Returns: dict: Tool configuration with only essential keys for prompting. """ valid_keys = ["name", "description", "parameter", "required"] tool = copy.deepcopy(tool) for key in list(tool.keys()): if key not in valid_keys: del tool[key] return tool
[docs] def prepare_tool_prompts(self, tool_list): """ Prepare a list of tool configurations for prompt usage. Args: tool_list (list): List of tool configuration dictionaries. Returns: list: List of tool configurations with only essential keys for prompting. """ copied_list = [] for tool in tool_list: copied_list.append(self.prepare_one_tool_prompt(tool)) return copied_list
[docs] def remove_keys(self, tool_list, invalid_keys): """ Remove specified keys from a list of tool configurations. Args: tool_list (list): List of tool configuration dictionaries. invalid_keys (list): List of keys to remove from each tool configuration. Returns: list: Deep copy of tool list with specified keys removed. """ copied_list = copy.deepcopy(tool_list) for tool in copied_list: # Create a list of keys to avoid modifying the dictionary during iteration for key in list(tool.keys()): if key in invalid_keys: del tool[key] return copied_list
[docs] def prepare_tool_examples(self, tool_list): """ Prepare tool configurations for example usage by keeping extended set of keys. This method is similar to prepare_tool_prompts but includes additional keys useful for examples and documentation. Args: tool_list (list): List of tool configuration dictionaries. Returns: list: Deep copy of tool list with only example-relevant keys. """ valid_keys = [ "name", "description", "parameter", "required", "query_schema", "fields", "label", "type", ] copied_list = copy.deepcopy(tool_list) for tool in copied_list: # Create a list of keys to avoid modifying the dictionary during iteration for key in list(tool.keys()): if key not in valid_keys: del tool[key] return copied_list
[docs] def get_tool_specification_by_names(self, tool_names, format="default"): """ Retrieve tool specifications by their names using tool_specification method. Args: tool_names (list): List of tool names to retrieve. format (str, optional): Output format. Options: 'default', 'openai'. If 'openai', returns OpenAI function calling format. Defaults to 'default'. Returns: list: List of tool specifications for the specified names. Tools not found will be reported but not included in the result. """ picked_tool_list = [] for each_name in tool_names: tool_spec = self.tool_specification(each_name, format=format) if tool_spec: picked_tool_list.append(tool_spec) return picked_tool_list
[docs] def get_tool_by_name(self, tool_names, format="default"): """ Retrieve tool configurations by their names. Args: tool_names (list): List of tool names to retrieve. format (str, optional): Output format. Options: 'default', 'openai'. If 'openai', returns OpenAI function calling format. Defaults to 'default'. Returns: list: List of tool configurations for the specified names. Tools not found will be reported but not included in the result. """ return self.get_tool_specification_by_names(tool_names, format=format)
[docs] def get_one_tool_by_one_name(self, tool_name, return_prompt=True): """ Retrieve a single tool specification by name, optionally prepared for prompting. This is a convenience method that calls get_one_tool_by_one_name. Args: tool_name (str): Name of the tool to retrieve. return_prompt (bool, optional): If True, returns tool prepared for prompting. If False, returns full tool configuration. Defaults to True. Returns: dict or None: Tool configuration if found, None otherwise. """ warning( "The 'get_one_tool_by_one_name' method is deprecated and will be removed in a future version. " "Please use 'tool_specification' instead." ) return self.tool_specification(tool_name, return_prompt=return_prompt)
[docs] def tool_specification(self, tool_name, return_prompt=False, format="default"): """ Retrieve a single tool configuration by name. Args: tool_name (str): Name of the tool to retrieve. return_prompt (bool, optional): If True, returns tool prepared for prompting. If False, returns full tool configuration. Defaults to False. format (str, optional): Output format. Options: 'default', 'openai'. If 'openai', returns OpenAI function calling format. Defaults to 'default'. Returns: dict or None: Tool configuration if found, None otherwise. """ if tool_name in self.all_tool_dict: tool_config = self.all_tool_dict[tool_name] if format == "openai": parameters = tool_config.get("parameter", {}) if isinstance(parameters, dict): # 修复 required 字段格式 if "properties" in parameters: for _prop_name, prop_config in parameters["properties"].items(): if ( isinstance(prop_config, dict) and "required" in prop_config ): del prop_config["required"] if "required" in parameters and not isinstance( parameters["required"], list ): if parameters["required"] is True: required_list = [] if "properties" in parameters: for prop_name, prop_config in parameters[ "properties" ].items(): if ( isinstance(prop_config, dict) and prop_config.get("required") is True ): required_list.append(prop_name) parameters["required"] = required_list else: parameters["required"] = [] return { "name": tool_config["name"], "description": tool_config["description"], "parameters": parameters, } elif return_prompt: return self.prepare_one_tool_prompt(tool_config) else: return tool_config else: warning(f"Tool name {tool_name} not found in the loaded tools.") return None
[docs] def get_tool_description(self, tool_name): """ Get the description of a tool by its name. This is a convenience method that calls get_one_tool_by_one_name. Args: tool_name (str): Name of the tool. Returns: dict or None: Tool configuration if found, None otherwise. """ return self.get_one_tool_by_one_name(tool_name)
[docs] def get_tool_type_by_name(self, tool_name): """ Get the type of a tool by its name. Args: tool_name (str): Name of the tool. Returns: str: The type of the tool. Raises: KeyError: If the tool name is not found in loaded tools. """ return self.all_tool_dict[tool_name]["type"]
[docs] def tool_to_str(self, tool_list): """ Convert a list of tool configurations to a formatted string. Args: tool_list (list): List of tool configuration dictionaries. Returns: str: JSON-formatted string representation of the tools, with each tool separated by double newlines. """ return "\n\n".join(json.dumps(obj, indent=4) for obj in tool_list)
[docs] def extract_function_call_json( self, lst, return_message=False, verbose=True, format="llama" ): """ Extract function call JSON from input data. This method delegates to the utility function extract_function_call_json. Args: lst: Input data containing function call information. return_message (bool, optional): Whether to return message along with JSON. Defaults to False. verbose (bool, optional): Whether to enable verbose output. Defaults to True. format (str, optional): Format type for extraction. Defaults to 'llama'. Returns: dict or tuple: Function call JSON, optionally with message if return_message is True. """ return extract_function_call_json( lst, return_message=return_message, verbose=verbose, format=format )
[docs] def call_id_gen(self): """ Generate a random call ID for function calls. Returns: str: A random 9-character string composed of letters and digits. """ return "".join(random.choices(string.ascii_letters + string.digits, k=9))
[docs] def run(self, fcall_str, return_message=False, verbose=True, format="llama"): """ Execute function calls from input string or data. This method parses function call data, validates it, and executes the corresponding tools. It supports both single function calls and multiple function calls in a list. Args: fcall_str: Input string or data containing function call information. return_message (bool, optional): Whether to return formatted messages. Defaults to False. verbose (bool, optional): Whether to enable verbose output. Defaults to True. format (str, optional): Format type for parsing. Defaults to 'llama'. Returns: list or str or None: - For multiple function calls: List of formatted messages with tool responses - For single function call: Direct result from the tool - None: If the input is not a valid function call """ if return_message: function_call_json, message = self.extract_function_call_json( fcall_str, return_message=return_message, verbose=verbose, format=format ) else: function_call_json = self.extract_function_call_json( fcall_str, return_message=return_message, verbose=verbose, format=format ) message = "" # Initialize message for cases where return_message=False if function_call_json is not None: if isinstance(function_call_json, list): # return the function call+result message with call id. call_results = [] for i in range(len(function_call_json)): call_result = self.run_one_function(function_call_json[i]) call_id = self.call_id_gen() function_call_json[i]["call_id"] = call_id call_results.append( { "role": "tool", "content": json.dumps( {"content": call_result, "call_id": call_id} ), } ) revised_messages = [ { "role": "assistant", "content": message, "tool_calls": json.dumps(function_call_json), } ] + call_results return revised_messages else: return self.run_one_function(function_call_json) else: error("Not a function call") return None
[docs] def run_one_function(self, function_call_json): """ Execute a single function call. This method validates the function call, initializes the tool if necessary, and executes it with the provided arguments. If hooks are enabled, it also applies output hooks to process the result. Args: function_call_json (dict): Dictionary containing function name and arguments. Returns: str or dict: Result from the tool execution, or error message if validation fails. """ check_status, check_message = self.check_function_call(function_call_json) if check_status is False: return ( "Invalid function call: " + check_message ) # + " You must correct your invalid function call!" function_name = function_call_json["name"] arguments = function_call_json["arguments"] # Execute the tool if function_name in self.callable_functions: result = self.callable_functions[function_name].run(arguments) else: if function_name in self.all_tool_dict: self.logger.debug( "Initiating callable_function from loaded tool dicts." ) tool = self.init_tool( self.all_tool_dict[function_name], add_to_cache=True ) result = tool.run(arguments) else: return f"Tool '{function_name}' not found" # Apply output hooks if enabled if self.hook_manager: context = { "tool_name": function_name, "tool_type": ( tool.__class__.__name__ if "tool" in locals() else "unknown" ), "execution_time": time.time(), "arguments": arguments, } result = self.hook_manager.apply_hooks( result, function_name, arguments, context ) return result
[docs] def toggle_hooks(self, enabled: bool): """ Enable or disable output hooks globally. This method allows runtime control of the hook system. When enabled, it initializes the HookManager if not already present. When disabled, it deactivates the HookManager. Args: enabled (bool): True to enable hooks, False to disable """ self.hooks_enabled = enabled if enabled and not self.hook_manager: self.hook_manager = HookManager({}, self) self.logger.info("Output hooks enabled") elif self.hook_manager: self.hook_manager.toggle_hooks(enabled) self.logger.info(f"Output hooks {'enabled' if enabled else 'disabled'}") else: self.logger.debug("Output hooks disabled")
[docs] def init_tool(self, tool=None, tool_name=None, add_to_cache=True): """ Initialize a tool instance from configuration or name. This method creates a new tool instance using the tool type mappings and optionally caches it for future use. It handles special cases like the OpentargetToolDrugNameMatch which requires additional dependencies. Args: tool (dict, optional): Tool configuration dictionary. Either this or tool_name must be provided. tool_name (str, optional): Name of the tool type to initialize. Either this or tool must be provided. add_to_cache (bool, optional): Whether to cache the initialized tool. Defaults to True. Returns: object: Initialized tool instance. Raises: KeyError: If the tool type is not found in tool_type_mappings. """ global tool_type_mappings if tool_name is not None: # Use lazy loading to get the tool class tool_class = get_tool_class_lazy(tool_name) if tool_class is None: raise KeyError(f"Tool type '{tool_name}' not found in registry") new_tool = tool_class() else: tool_type = tool["type"] tool_name = tool["name"] # Use lazy loading to get the tool class tool_class = get_tool_class_lazy(tool_type) if tool_class is None: # Fallback to old method if lazy loading fails if tool_type not in tool_type_mappings: # Refresh registry and try again tool_type_mappings = get_tool_registry() if tool_type not in tool_type_mappings: raise KeyError(f"Tool type '{tool_type}' not found in registry") tool_class = tool_type_mappings[tool_type] if "OpentargetToolDrugNameMatch" == tool_type: if "FDADrugLabelGetDrugGenericNameTool" not in self.callable_functions: drug_tool_class = get_tool_class_lazy( "FDADrugLabelGetDrugGenericNameTool" ) if drug_tool_class is None: drug_tool_class = tool_type_mappings[ "FDADrugLabelGetDrugGenericNameTool" ] self.callable_functions["FDADrugLabelGetDrugGenericNameTool"] = ( drug_tool_class() ) new_tool = tool_class( tool_config=tool, drug_generic_tool=self.callable_functions[ "FDADrugLabelGetDrugGenericNameTool" ], ) elif "ToolFinderEmbedding" == tool_type: new_tool = tool_class(tool_config=tool, tooluniverse=self) elif "ComposeTool" == tool_type: new_tool = tool_class(tool_config=tool, tooluniverse=self) elif "ToolFinderLLM" == tool_type: new_tool = tool_class(tool_config=tool, tooluniverse=self) elif "ToolFinderKeyword" == tool_type: new_tool = tool_class(tool_config=tool, tooluniverse=self) else: new_tool = tool_class(tool_config=tool) if add_to_cache: self.callable_functions[tool_name] = new_tool return new_tool
[docs] def check_function_call(self, fcall_str, function_config=None, format="llama"): """ Validate a function call against tool configuration. This method checks if a function call is valid by verifying the function name exists and the arguments match the expected parameters. Args: fcall_str: Function call string or data to validate. function_config (dict, optional): Specific function configuration to validate against. If None, uses the loaded tool configuration. format (str, optional): Format type for parsing. Defaults to 'llama'. Returns: tuple: A tuple of (is_valid, message) where: - is_valid (bool): True if the function call is valid, False otherwise - message (str): Error message if invalid, empty if valid """ function_call_json = self.extract_function_call_json(fcall_str, format=format) if function_call_json is not None: if function_config is not None: return evaluate_function_call(function_config, function_call_json) function_name = function_call_json["name"] if function_name not in self.all_tool_dict: return ( False, f"Function name {function_name} not found in loaded tools.", ) return evaluate_function_call( self.all_tool_dict[function_name], function_call_json ) else: return False, "Invalid JSON string of function call"
[docs] def export_tool_names(self, output_file, category_filter=None): """ Export tool names to a text file (one per line). Args: output_file (str): Path to the output file category_filter (list, optional): List of categories to filter by """ try: tools_to_export = [] if category_filter: # Filter by categories for category in category_filter: if category in self.tool_category_dicts: tools_to_export.extend( [ tool["name"] for tool in self.tool_category_dicts[category] ] ) else: # Export all tools tools_to_export = [tool["name"] for tool in self.all_tools] # Remove duplicates and sort tools_to_export = sorted(set(tools_to_export)) with open(output_file, "w", encoding="utf-8") as f: f.write("# ToolUniverse Tool Names\n") f.write(f"# Generated automatically - {len(tools_to_export)} tools\n") if category_filter: f.write(f"# Categories: {', '.join(category_filter)}\n") f.write("\n") for tool_name in tools_to_export: f.write(f"{tool_name}\n") self.logger.info( f"Exported {len(tools_to_export)} tool names to {output_file}" ) return tools_to_export except Exception as e: self.logger.error(f"Error exporting tool names to {output_file}: {e}") return []
[docs] def get_available_tools(self, category_filter=None, name_only=True): """ Get available tools, optionally filtered by category. Args: category_filter (list, optional): List of categories to filter by name_only (bool): If True, return only tool names; if False, return full configs Returns: list: List of tool names or tool configurations """ if not hasattr(self, "all_tools") or not self.all_tools: self.logger.warning("No tools loaded. Call load_tools() first.") return [] if category_filter: filtered_tools = [] for tool in self.all_tools: tool_type = tool.get("type", "") if tool_type in category_filter: filtered_tools.append(tool) else: filtered_tools = self.all_tools if name_only: return [tool["name"] for tool in filtered_tools] else: return filtered_tools
[docs] def find_tools_by_pattern(self, pattern, search_in="name", case_sensitive=False): """ Find tools matching a pattern in their name or description. Args: pattern (str): Pattern to search for search_in (str): Where to search - 'name', 'description', or 'both' case_sensitive (bool): Whether search should be case sensitive Returns: list: List of matching tool configurations """ if not hasattr(self, "all_tools") or not self.all_tools: self.logger.warning("No tools loaded. Call load_tools() first.") return [] import re flags = 0 if case_sensitive else re.IGNORECASE matching_tools = [] for tool in self.all_tools: found = False if search_in in ["name", "both"]: tool_name = tool.get("name", "") if re.search(pattern, tool_name, flags): found = True if search_in in ["description", "both"] and not found: tool_desc = tool.get("description", "") if re.search(pattern, tool_desc, flags): found = True if found: matching_tools.append(tool) self.logger.info( f"Found {len(matching_tools)} tools matching pattern '{pattern}'" ) return matching_tools
[docs] def load_tools_from_names_list(self, tool_names, clear_existing=True): """ Load only specific tools by their names. Args: tool_names (list): List of tool names to load clear_existing (bool): Whether to clear existing tools first Returns: int: Number of tools successfully loaded """ if clear_existing: self.all_tools = [] self.all_tool_dict = {} self.tool_category_dicts = {} # Use the enhanced load_tools method original_count = len(self.all_tools) self.load_tools(include_tools=tool_names) return len(self.all_tools) - original_count