Source code for tooluniverse.execute_function
"""
ToolUniverse Function Execution Module
This module provides the core ToolUniverse class for managing and executing various scientific and data tools.
It supports loading tools from JSON configurations, organizing them by categories, validating function calls,
and executing tools with proper error handling and caching.
The module includes support for:
- GraphQL tools (OpenTarget, OpenTarget Genetics)
- RESTful API tools (Monarch, ChEMBL, PubChem, etc.)
- FDA drug labeling and adverse event tools
- Clinical trials tools
- Literature search tools (EuropePMC, Semantic Scholar, PubTator)
- Biological databases (HPA, Reactome, UniProt)
- MCP (Model Context Protocol) clients and auto-loaders
- Enrichment analysis tools
- Package management tools
Classes:
ToolUniverse: Main class for tool management and execution
Constants:
default_tool_files: Default mapping of tool categories to JSON file paths
tool_type_mappings: Mapping of tool type strings to their implementation classes
"""
import copy
import json
import random
import string
import os
import time
from .utils import read_json_list, evaluate_function_call, extract_function_call_json
from .tool_registry import (
auto_discover_tools,
get_tool_registry,
register_external_tool,
get_tool_class_lazy,
)
from .logging_config import (
get_logger,
debug,
info,
warning,
error,
set_log_level,
)
from .output_hook import HookManager
from .default_config import default_tool_files, get_default_hook_config
# Determine the directory where the current file is located
current_dir = os.path.dirname(os.path.abspath(__file__))
# Check if lazy loading is enabled (default: True for better performance)
LAZY_LOADING_ENABLED = os.getenv("TOOLUNIVERSE_LAZY_LOADING", "true").lower() in (
"true",
"1",
"yes",
)
if LAZY_LOADING_ENABLED:
# Use lazy auto-discovery by default (much faster)
debug("Starting lazy tool auto-discovery...")
tool_type_mappings = auto_discover_tools(lazy=True)
else:
# Use full auto-discovery (slower but ensures all tools are immediately available)
debug("Starting full tool auto-discovery...")
tool_type_mappings = auto_discover_tools(lazy=False)
# Update the registry with any manually added tools
tool_type_mappings = get_tool_registry()
if LAZY_LOADING_ENABLED:
debug(
f"Lazy tool registry initialized with {len(tool_type_mappings)} immediately available tools"
)
else:
debug(f"Full tool registry initialized with {len(tool_type_mappings)} tools")
for _tool_name, _tool_class in sorted(tool_type_mappings.items()):
debug(f" - {_tool_name}: {_tool_class.__name__}")
[docs]
class ToolUniverse:
"""
A comprehensive tool management system for loading, organizing, and executing various scientific and data tools.
The ToolUniverse class provides a centralized interface for managing different types of tools including
GraphQL tools, RESTful APIs, MCP clients, and specialized scientific tools. It handles tool loading,
filtering, caching, and execution.
Attributes:
all_tools (list): List of all loaded tool configurations
all_tool_dict (dict): Dictionary mapping tool names to their configurations
tool_category_dicts (dict): Dictionary organizing tools by category
tool_files (dict): Dictionary mapping category names to their JSON file paths
callable_functions (dict): Cache of instantiated tool objects
"""
[docs]
def __init__(
self,
tool_files=default_tool_files,
keep_default_tools=True,
log_level: str = None,
hooks_enabled: bool = False,
hook_config: dict = None,
hook_type: str = None,
):
"""
Initialize the ToolUniverse with tool file configurations.
Args:
tool_files (dict, optional): Dictionary mapping category names to JSON file paths.
Defaults to default_tool_files.
keep_default_tools (bool, optional): Whether to keep default tools when custom
tool_files are provided. Defaults to True.
log_level (str, optional): Log level for this instance. Can be 'DEBUG', 'INFO',
'WARNING', 'ERROR', 'CRITICAL'. If None, uses global setting.
hooks_enabled (bool, optional): Whether to enable output hooks. Defaults to False.
hook_config (dict, optional): Configuration for hooks. If None, uses default config.
hook_type (str or list, optional): Simple hook type selection. Can be 'SummarizationHook',
'FileSaveHook', or a list of both. Defaults to 'SummarizationHook'.
If both hook_config and hook_type are provided, hook_config takes precedence.
"""
# Set log level if specified
if log_level is not None:
set_log_level(log_level)
# Get logger for this class
self.logger = get_logger("ToolUniverse")
# Initialize any necessary attributes here FIRST
self.all_tools = []
self.all_tool_dict = {}
self.tool_category_dicts = {}
self.tool_finder = None
if tool_files is None:
tool_files = default_tool_files
elif keep_default_tools:
default_tool_files.update(tool_files)
tool_files = default_tool_files
self.tool_files = tool_files
self.logger.debug("Tool files:")
self.logger.debug(json.dumps(tool_files, indent=2))
self.callable_functions = {}
# Refresh the global tool_type_mappings to include any tools registered during imports
global tool_type_mappings
tool_type_mappings = get_tool_registry()
# Initialize hook system AFTER attributes are initialized
self.hooks_enabled = hooks_enabled
if self.hooks_enabled:
# Determine hook configuration
if hook_config is not None:
# Use provided hook_config (takes precedence)
final_hook_config = hook_config
self.logger.info("Using provided hook_config")
elif hook_type is not None:
# Use hook_type to generate simple configuration
final_hook_config = self._create_hook_config_from_type(hook_type)
self.logger.info(f"Using hook_type: {hook_type}")
else:
# Use default configuration with SummarizationHook
final_hook_config = get_default_hook_config()
self.logger.info("Using default hook configuration (SummarizationHook)")
self.hook_manager = HookManager(final_hook_config, self)
self.logger.info("Output hooks enabled")
else:
self.hook_manager = None
self.logger.debug("Output hooks disabled")
[docs]
def register_custom_tool(self, tool_class, tool_name=None, tool_config=None):
"""
Register a custom tool class at runtime.
Args:
tool_class: The tool class to register
tool_name (str, optional): Name to register under. Uses class name if None.
tool_config (dict, optional): Tool configuration dictionary to add to all_tools
Returns:
str: The name the tool was registered under
"""
name = tool_name or tool_class.__name__
# Register the tool class
register_external_tool(name, tool_class)
# Update the global tool_type_mappings
global tool_type_mappings
tool_type_mappings = get_tool_registry()
# If tool_config is provided, add it to all_tools
if tool_config:
# Ensure the config has the correct type
if "type" not in tool_config:
tool_config["type"] = name
self.all_tools.append(tool_config)
if "name" in tool_config:
self.all_tool_dict[tool_config["name"]] = tool_config
self.logger.info(f"Custom tool '{name}' registered successfully!")
return name
[docs]
def force_full_discovery(self):
"""
Force full tool discovery, importing all tool modules immediately.
This can be useful when you need to ensure all tools are available
immediately, bypassing lazy loading.
Returns:
dict: Updated tool registry with all discovered tools
"""
global tool_type_mappings
self.logger.info("Forcing full tool discovery...")
tool_type_mappings = auto_discover_tools(lazy=False)
self.logger.info(
f"Full discovery complete. {len(tool_type_mappings)} tools available."
)
return tool_type_mappings
[docs]
def get_lazy_loading_status(self):
"""
Get information about lazy loading status and available tools.
Returns:
dict: Dictionary with lazy loading status and tool counts
"""
from .tool_registry import _discovery_completed, _lazy_registry
return {
"lazy_loading_enabled": LAZY_LOADING_ENABLED,
"full_discovery_completed": _discovery_completed,
"immediately_available_tools": len(tool_type_mappings),
"lazy_mappings_available": len(_lazy_registry),
"loaded_tools_count": (
len(self.all_tools) if hasattr(self, "all_tools") else 0
),
}
[docs]
def get_tool_types(self):
"""
Get the types of tools available in the tool files.
Returns:
list: A list of tool type names (category keys).
"""
return list(self.tool_files.keys())
def _get_api_key(self, key_name: str):
"""Get API key from environment variables or loaded sources"""
# First check environment variables (highest priority)
env_value = os.getenv(key_name)
if env_value:
return env_value
else:
return None
def _check_api_key_requirements(self, tool_config):
"""
Check if a tool's required API keys are available.
Also supports optional_api_keys where at least one key from the list must be available.
Args:
tool_config (dict): Tool configuration containing optional 'required_api_keys' and 'optional_api_keys' fields
Returns:
tuple: (bool, list) - (all_keys_available, missing_keys)
"""
required_keys = tool_config.get("required_api_keys", [])
optional_keys = tool_config.get("optional_api_keys", [])
missing_keys = []
# Check required keys (all must be available)
for key in required_keys:
if not self._get_api_key(key):
missing_keys.append(key)
# Check optional keys (at least one must be available)
optional_satisfied = True
if optional_keys:
optional_available = any(self._get_api_key(key) for key in optional_keys)
if not optional_available:
optional_satisfied = False
# For error reporting, add a descriptive message about optional keys
missing_keys.append(f"At least one of: {', '.join(optional_keys)}")
# Tool is valid if all required keys are available AND optional requirement is satisfied
all_valid = (
len([k for k in missing_keys if not k.startswith("At least one of:")]) == 0
and optional_satisfied
)
return all_valid, missing_keys
[docs]
def generate_env_template(
self, all_missing_keys, output_file: str = ".env.template"
):
"""Generate a template .env file with all required API keys"""
with open(output_file, "w") as f:
f.write("# API Keys for ToolUniverse\n")
f.write("# Copy this file to .env and fill in your actual API keys\n\n")
for key in sorted(all_missing_keys):
f.write(f"{key}=your_api_key_here\n\n")
self.logger.info(f"Generated API key template: {output_file}")
self.logger.info("Copy this file to .env and fill in your API keys")
def _create_hook_config_from_type(self, hook_type):
"""
Create hook configuration from simple hook_type parameter.
Args:
hook_type (str or list): Hook type(s) to enable. Can be 'SummarizationHook',
'FileSaveHook', or a list of both.
Returns:
dict: Generated hook configuration
"""
# Handle single hook type
if isinstance(hook_type, str):
hook_types = [hook_type]
else:
hook_types = hook_type
# Validate hook types
valid_types = ["SummarizationHook", "FileSaveHook"]
for htype in hook_types:
if htype not in valid_types:
raise ValueError(
f"Invalid hook_type: {htype}. Valid types are: {valid_types}"
)
# Create hooks list
hooks = []
for htype in hook_types:
if htype == "SummarizationHook":
hooks.append(
{
"name": "summarization_hook",
"type": "SummarizationHook",
"enabled": True,
"conditions": {
"output_length": {"operator": ">", "threshold": 5000}
},
"hook_config": {
"chunk_size": 32000,
"focus_areas": "key_findings_and_results",
"max_summary_length": 3000,
},
}
)
elif htype == "FileSaveHook":
hooks.append(
{
"name": "file_save_hook",
"type": "FileSaveHook",
"enabled": True,
"conditions": {
"output_length": {"operator": ">", "threshold": 1000}
},
"hook_config": {
"temp_dir": None,
"file_prefix": "tool_output",
"include_metadata": True,
"auto_cleanup": False,
"cleanup_age_hours": 24,
},
}
)
return {"hooks": hooks}
[docs]
def load_tools(
self,
tool_type=None,
exclude_tools=None,
exclude_categories=None,
include_tools=None,
tool_config_files=None,
tools_file=None,
include_tool_types=None,
exclude_tool_types=None,
):
"""
Loads tool definitions from JSON files into the instance's tool registry.
If `tool_type` is None, loads all available tool categories from `self.tool_files`.
Otherwise, loads only the specified tool categories.
After loading, deduplicates tools by their 'name' field and updates the internal tool list.
Also refreshes the tool name and description mapping.
Args:
tool_type (list, optional): List of tool category names to load. If None, loads all categories.
exclude_tools (list, optional): List of specific tool names to exclude from loading.
exclude_categories (list, optional): List of tool categories to exclude from loading.
include_tools (list or str, optional): List of specific tool names to include, or path to a text file
containing tool names (one per line). If provided, only these tools
will be loaded regardless of categories.
tool_config_files (dict, optional): Additional tool configuration files to load.
Format: {"category_name": "/path/to/config.json"}
tools_file (str, optional): Path to a text file containing tool names to include (one per line).
Alternative to include_tools when providing a file path.
include_tool_types (list, optional): List of tool types to include (e.g., ["OpenTarget", "ChEMBLTool"]).
If provided, only tools with these types will be loaded.
exclude_tool_types (list, optional): List of tool types to exclude (e.g., ["ToolFinderEmbedding"]).
Tools with these types will be excluded.
Side Effects:
- Updates `self.all_tools` with loaded and deduplicated tools.
- Updates `self.tool_category_dicts` with loaded tools per category.
- Calls `self.refresh_tool_name_desc()` to update tool name/description mapping.
- Prints the number of tools before and after loading.
Examples:
# Load specific tools by name
tu.load_tools(include_tools=["UniProt_get_entry_by_accession", "ChEMBL_get_molecule_by_chembl_id"])
# Load tools from a file
tu.load_tools(tools_file="/path/to/tool_names.txt")
# Include only specific tool types
tu.load_tools(include_tool_types=["OpenTarget", "ChEMBLTool"])
# Exclude specific tool types
tu.load_tools(exclude_tool_types=["ToolFinderEmbedding", "Unknown"])
# Load additional config files
tu.load_tools(tool_config_files={"custom_tools": "/path/to/custom_tools.json"})
# Combine multiple options
tu.load_tools(
tool_type=["uniprot", "ChEMBL"],
exclude_tools=["problematic_tool"],
exclude_tool_types=["Unknown"],
tool_config_files={"custom": "/path/to/custom.json"}
)
"""
self.logger.debug(f"Number of tools before load tools: {len(self.all_tools)}")
# Handle tools_file parameter (alternative to include_tools)
if tools_file:
include_tools = self._load_tool_names_from_file(tools_file)
# Handle include_tools parameter
if isinstance(include_tools, str):
# If include_tools is a string, treat it as a file path
include_tools = self._load_tool_names_from_file(include_tools)
# Convert parameters to sets for efficient lookup
exclude_tools_set = set(exclude_tools or [])
exclude_categories_set = set(exclude_categories or [])
include_tools_set = set(include_tools or []) if include_tools else None
include_tool_types_set = (
set(include_tool_types or []) if include_tool_types else None
)
exclude_tool_types_set = (
set(exclude_tool_types or []) if exclude_tool_types else None
)
# Log operations
if exclude_tools_set:
self.logger.info(
f"Excluding tools by name: {', '.join(list(exclude_tools_set)[:5])}{'...' if len(exclude_tools_set) > 5 else ''}"
)
if exclude_categories_set:
self.logger.info(
f"Excluding categories: {', '.join(exclude_categories_set)}"
)
if include_tools_set:
self.logger.info(
f"Including only specific tools: {len(include_tools_set)} tools specified"
)
if include_tool_types_set:
self.logger.info(
f"Including only tool types: {', '.join(include_tool_types_set)}"
)
if exclude_tool_types_set:
self.logger.info(
f"Excluding tool types: {', '.join(exclude_tool_types_set)}"
)
if tool_config_files:
self.logger.info(
f"Loading additional config files: {', '.join(tool_config_files.keys())}"
)
# Merge additional config files with existing tool_files
all_tool_files = self.tool_files.copy()
if tool_config_files:
# Validate that additional config files exist
for category, file_path in tool_config_files.items():
if os.path.exists(file_path):
all_tool_files[category] = file_path
self.logger.debug(
f"Added config file for category '{category}': {file_path}"
)
else:
self.logger.warning(
f"Config file for category '{category}' not found: {file_path}"
)
# Determine which categories to process
if tool_type is None:
categories_to_load = [
cat
for cat in all_tool_files.keys()
if cat not in exclude_categories_set
]
else:
assert isinstance(
tool_type, list
), "tool_type must be a list of tool category names"
categories_to_load = [
cat for cat in tool_type if cat not in exclude_categories_set
]
# Load tools from specified categories
for each in categories_to_load:
if each in all_tool_files:
try:
loaded_data = read_json_list(all_tool_files[each])
# Handle different data formats
if isinstance(loaded_data, dict):
# Convert dict of tools to list of tools
loaded_tool_list = list(loaded_data.values())
self.logger.debug(
f"Converted dict to list: {len(loaded_tool_list)} tools"
)
elif isinstance(loaded_data, list):
loaded_tool_list = loaded_data
else:
self.logger.warning(
f"Unexpected data format from {all_tool_files[each]}: {type(loaded_data)}"
)
continue
self.all_tools += loaded_tool_list
self.tool_category_dicts[each] = loaded_tool_list
self.logger.debug(
f"Loaded {len(loaded_tool_list)} tools from category '{each}'"
)
except Exception as e:
self.logger.error(
f"Error loading tools from category '{each}': {e}"
)
else:
self.logger.warning(
f"Tool category '{each}' not found in available tool files"
)
# Load auto-discovered configs from decorators
self._load_auto_discovered_configs()
# Filter and deduplicate tools
self._filter_and_deduplicate_tools(
exclude_tools_set,
include_tools_set,
include_tool_types_set,
exclude_tool_types_set,
)
# Process MCP Auto Loader tools
self.logger.debug("Checking for MCP Auto Loader tools...")
self._process_mcp_auto_loaders()
def _load_tool_names_from_file(self, file_path):
"""
Load tool names from a text file (one tool name per line).
Args:
file_path (str): Path to the text file containing tool names
Returns:
list: List of tool names loaded from the file
"""
try:
if not os.path.exists(file_path):
self.logger.error(f"Tools file not found: {file_path}")
return []
with open(file_path, "r", encoding="utf-8") as f:
tool_names = []
for _line_num, line in enumerate(f, 1):
line = line.strip()
if line and not line.startswith(
"#"
): # Skip empty lines and comments
tool_names.append(line)
self.logger.info(
f"Loaded {len(tool_names)} tool names from file: {file_path}"
)
return tool_names
except Exception as e:
self.logger.error(f"Error loading tool names from file {file_path}: {e}")
return []
def _filter_and_deduplicate_tools(
self,
exclude_tools_set,
include_tools_set,
include_tool_types_set=None,
exclude_tool_types_set=None,
):
"""
Filter tools based on inclusion/exclusion criteria and remove duplicates.
Args:
exclude_tools_set (set): Set of tool names to exclude
include_tools_set (set or None): Set of tool names to include (if None, include all)
include_tool_types_set (set or None): Set of tool types to include (if None, include all)
exclude_tool_types_set (set or None): Set of tool types to exclude (if None, exclude none)
"""
tool_name_list = []
dedup_all_tools = []
all_missing_keys = set()
duplicate_names = set()
excluded_tools_count = 0
included_tools_count = 0
missing_included_tools = set()
# If include_tools_set is specified, track which tools we found
if include_tools_set:
missing_included_tools = include_tools_set.copy()
for each in self.all_tools:
# Handle both dict and string entries
if isinstance(each, dict):
tool_name = each.get("name", "")
tool_type = each.get("type", "Unknown")
elif isinstance(each, str):
self.logger.warning(f"Found string in all_tools: {each}")
continue
else:
self.logger.warning(f"Unknown type in all_tools: {type(each)} - {each}")
continue
# Check tool type inclusion/exclusion
if include_tool_types_set and tool_type not in include_tool_types_set:
self.logger.debug(
f"Excluding tool '{tool_name}' - type '{tool_type}' not in include list"
)
continue
if exclude_tool_types_set and tool_type in exclude_tool_types_set:
self.logger.debug(
f"Excluding tool '{tool_name}' - type '{tool_type}' is excluded"
)
continue
# If include_tools_set is specified, only include tools in that set
if include_tools_set:
if tool_name not in include_tools_set:
continue
else:
missing_included_tools.discard(tool_name)
included_tools_count += 1
# Skip excluded tools
if tool_name in exclude_tools_set:
excluded_tools_count += 1
self.logger.debug(f"Excluding tool by name: {tool_name}")
continue
# Check API key requirements
if "required_api_keys" in each:
all_keys_available, missing_keys = self._check_api_key_requirements(
each
)
if not all_keys_available:
all_missing_keys.update(missing_keys)
self.logger.debug(
f"Skipping tool '{tool_name}' due to missing API keys: {', '.join(missing_keys)}"
)
continue
# Handle duplicates
if tool_name not in tool_name_list:
tool_name_list.append(tool_name)
dedup_all_tools.append(each)
else:
duplicate_names.add(tool_name)
# Report statistics
if duplicate_names:
self.logger.debug(
f"Duplicate tool names found and dropped: {', '.join(list(duplicate_names)[:5])}{'...' if len(duplicate_names) > 5 else ''}"
)
if excluded_tools_count > 0:
self.logger.info(f"Excluded {excluded_tools_count} tools by name")
if include_tools_set:
self.logger.info(f"Included {included_tools_count} tools by name filter")
if missing_included_tools:
self.logger.warning(
f"Could not find {len(missing_included_tools)} requested tools: {', '.join(list(missing_included_tools)[:5])}{'...' if len(missing_included_tools) > 5 else ''}"
)
self.all_tools = dedup_all_tools
self.refresh_tool_name_desc()
info(f"Number of tools after load tools: {len(self.all_tools)}")
# Generate template for missing API keys
if len(all_missing_keys) > 0:
warning(f"\nMissing API keys: {', '.join(all_missing_keys)}")
info("Generating .env.template file with missing API keys...")
self.generate_env_template(all_missing_keys)
def _load_auto_discovered_configs(self):
"""
Load auto-discovered configs from the decorator registry.
This method loads tool configurations that were registered automatically
via the @register_tool decorator with config parameter.
"""
from .tool_registry import get_config_registry
discovered_configs = get_config_registry()
if discovered_configs:
self.logger.debug(
f"Loading {len(discovered_configs)} auto-discovered tool configs"
)
for _tool_type, config in discovered_configs.items():
# Add to all_tools if not already present
if "name" in config and config["name"] not in [
tool.get("name") for tool in self.all_tools
]:
self.all_tools.append(config)
self.logger.debug(f"Added auto-discovered config: {config['name']}")
def _process_mcp_auto_loaders(self):
"""
Process any MCPAutoLoaderTool instances to automatically discover and register MCP tools.
This method scans through all loaded tools for MCPAutoLoaderTool instances and runs their
auto-discovery process to find and register MCP tools from configured servers. It handles
async operations properly with cleanup and error handling.
Side Effects:
- May add new tools to the tool registry
- Prints debug information about the discovery process
- Updates tool counts after MCP registration
"""
self.logger.debug("Starting _process_mcp_auto_loaders")
import asyncio
import warnings
auto_loaders = []
self.logger.debug(f"Checking {len(self.all_tools)} tools for MCPAutoLoaderTool")
for tool_config in self.all_tools:
if tool_config.get("type") == "MCPAutoLoaderTool":
auto_loaders.append(tool_config)
self.logger.debug(f"Found MCPAutoLoaderTool: {tool_config['name']}")
if not auto_loaders:
self.logger.debug("No MCP Auto Loader tools found")
return
info(f"Found {len(auto_loaders)} MCP Auto Loader tool(s), processing...")
# Check if we're already in an event loop
try:
asyncio.get_running_loop()
in_event_loop = True
self.logger.debug("Already in an event loop, using async approach")
except RuntimeError:
in_event_loop = False
self.logger.debug("No event loop running, will create new one")
# Process each auto loader
for loader_config in auto_loaders:
self.logger.debug(f"Processing loader: {loader_config['name']}")
try:
# Create auto loader instance
self.logger.debug("Creating auto loader instance...")
auto_loader = tool_type_mappings["MCPAutoLoaderTool"](loader_config)
self.logger.debug("Auto loader instance created")
# Run auto-load process with proper session cleanup
self.logger.debug("Starting auto-load process...")
async def _run_auto_load(loader):
"""Run auto-load with proper cleanup"""
try:
result = await loader.auto_load_and_register(self)
return result
finally:
# Ensure session cleanup
await loader._close_session()
if in_event_loop:
# We're already in an event loop, so we can't use run_until_complete
# Instead, we'll skip MCP auto-loading for now and warn the user
warning(
f"Warning: Cannot process MCP Auto Loader '{loader_config['name']}' because we're already in an event loop."
)
self.logger.debug(
"This is a known limitation when SMCP is used within an async context."
)
self.logger.debug(
"MCP tools will need to be loaded manually or the server should be run outside of an async context."
)
continue
else:
# No event loop, safe to create one
# Suppress ResourceWarnings during cleanup
with warnings.catch_warnings():
warnings.simplefilter("ignore", ResourceWarning)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
self.logger.debug("Running auto_load_and_register...")
result = loop.run_until_complete(
_run_auto_load(auto_loader)
)
self.logger.debug(
f"Auto-load completed with result: {result}"
)
info(
f"MCP Auto Loader '{loader_config['name']}' processed:"
)
info(
f" - Discovered: {result.get('discovered_count', 0)} tools"
)
info(
f" - Registered: {result.get('registered_count', 0)} tools"
)
# Print detailed tool information
if result.get("tools"):
print(
f" 📋 Discovered MCP tools: {', '.join(result['tools'])}"
)
if result.get("registered_tools"):
print(
f" 🔧 Registered tools in ToolUniverse: {', '.join(result['registered_tools'])}"
)
self.logger.debug(
f" - Tools: {', '.join(result['registered_tools'])}"
)
# Show available tools in callable_functions
expert_tools = [
name
for name in self.callable_functions.keys()
if name.startswith("expert_")
]
if expert_tools:
print(
f" ✅ Expert tools now available: {', '.join(expert_tools)}"
)
else:
print(
" ⚠️ No expert tools found in callable_functions after registration"
)
finally:
self.logger.debug("Closing async loop...")
# Clean up any remaining tasks
try:
pending = asyncio.all_tasks(loop)
for task in pending:
task.cancel()
if pending:
loop.run_until_complete(
asyncio.gather(*pending, return_exceptions=True)
)
except Exception:
pass # Ignore cleanup errors
finally:
loop.close()
self.logger.debug("Async loop closed")
except Exception as e:
self.logger.debug(f"Exception in auto loader processing: {e}")
import traceback
traceback.print_exc()
self.logger.debug(
f"Failed to process MCP Auto Loader '{loader_config['name']}': {str(e)}"
)
# Update tool count after MCP registration
self.logger.debug(
f"Number of tools after MCP auto-loading: {len(self.all_tool_dict)}"
)
self.logger.debug("_process_mcp_auto_loaders completed")
[docs]
def select_tools(
self,
include_names=None,
exclude_names=None,
include_categories=None,
exclude_categories=None,
):
"""
Select tools based on tool names and/or categories (tool_files keys).
Args:
include_names (list, optional): List of tool names to include. If None, include all.
exclude_names (list, optional): List of tool names to exclude.
include_categories (list, optional): List of categories (tool_files keys) to include.
If None, include all.
exclude_categories (list, optional): List of categories (tool_files keys) to exclude.
Returns:
list: List of selected tool configurations.
"""
selected_tools = []
# If categories are specified, use self.tool_category_dicts to filter
categories = set(self.tool_category_dicts.keys())
if include_categories is not None:
categories &= set(include_categories)
if exclude_categories is not None:
categories -= set(exclude_categories)
# Gather tools from selected categories
for cat in categories:
selected_tools.extend(self.tool_category_dicts[cat])
# Further filter by names if needed
if include_names is not None:
selected_tools = [
tool for tool in selected_tools if tool["name"] in include_names
]
if exclude_names is not None:
selected_tools = [
tool for tool in selected_tools if tool["name"] not in exclude_names
]
return selected_tools
[docs]
def filter_tool_lists(
self,
tool_name_list,
tool_desc_list,
include_names=None,
exclude_names=None,
include_categories=None,
exclude_categories=None,
):
"""
Directly filter tool name and description lists based on names and/or categories.
This method takes existing tool name and description lists and filters them according
to the specified criteria using the select_tools method for category-based filtering.
Args:
tool_name_list (list): List of tool names to filter.
tool_desc_list (list): List of tool descriptions to filter (must correspond to tool_name_list).
include_names (list, optional): List of tool names to include.
exclude_names (list, optional): List of tool names to exclude.
include_categories (list, optional): List of categories to include.
exclude_categories (list, optional): List of categories to exclude.
Returns:
tuple: A tuple containing (filtered_tool_name_list, filtered_tool_desc_list).
"""
# Build a set of allowed tool names using select_tools for category filtering
allowed_names = set()
if any([include_names, exclude_names, include_categories, exclude_categories]):
filtered_tools = self.select_tools(
include_names=include_names,
exclude_names=exclude_names,
include_categories=include_categories,
exclude_categories=exclude_categories,
)
allowed_names = set(tool["name"] for tool in filtered_tools)
else:
allowed_names = set(tool_name_list)
# Filter lists by allowed_names
filtered_tool_name_list = []
filtered_tool_desc_list = []
for name, desc in zip(tool_name_list, tool_desc_list):
if name in allowed_names:
filtered_tool_name_list.append(name)
filtered_tool_desc_list.append(desc)
return filtered_tool_name_list, filtered_tool_desc_list
[docs]
def return_all_loaded_tools(self):
"""
Return a deep copy of all loaded tools.
Returns:
list: A deep copy of the all_tools list to prevent external modification.
"""
return copy.deepcopy(self.all_tools)
[docs]
def list_built_in_tools(self, mode="config", scan_all=False):
"""
List all built-in tool categories and their statistics with different modes.
This method provides a comprehensive overview of all available tools in the ToolUniverse,
organized by categories. It reads directly from the default tool files to gather statistics,
so it works even before calling load_tools().
Args:
mode (str, optional): Organization mode for tools. Defaults to 'config'.
- 'config': Organize by config file categories (original behavior)
- 'type': Organize by tool types (implementation classes)
- 'list_name': Return a list of all tool names
- 'list_spec': Return a list of all tool specifications
scan_all (bool, optional): Whether to scan all JSON files in data directory recursively.
If True, scans all JSON files in data/ and its subdirectories.
If False (default), uses predefined tool file mappings.
Returns:
dict or list:
- For 'config' and 'type' modes: A dictionary containing tool statistics
- For 'list_name' mode: A list of all tool names
- For 'list_spec' mode: A list of all tool specifications
Example:
>>> tool_universe = ToolUniverse()
>>> # Group by config file categories (predefined files only)
>>> stats = tool_universe.list_built_in_tools(mode='config')
>>> # Scan all JSON files in data directory recursively
>>> stats = tool_universe.list_built_in_tools(mode='config', scan_all=True)
>>> # Get all tool names from all JSON files
>>> tool_names = tool_universe.list_built_in_tools(mode='list_name', scan_all=True)
Note:
- This method reads directly from tool files and works without calling load_tools()
- Tools are deduplicated across categories, so the same tool won't be counted multiple times
- The summary is automatically printed to console when this method is called (except for list_name and list_spec modes)
- When scan_all=True, all JSON files in data/ and subdirectories are scanned
"""
if mode not in ["config", "type", "list_name", "list_spec"]:
raise ValueError(
"Mode must be one of: 'config', 'type', 'list_name', 'list_spec'"
)
# For list_name and list_spec modes, we can return early with just the data
if mode in ["list_name", "list_spec"]:
all_tools = []
all_tool_names = set() # For deduplication across categories
if scan_all:
# Scan all JSON files in data directory recursively
all_tools, all_tool_names = self._scan_all_json_files()
else:
# Use predefined tool files (original behavior)
all_tools, all_tool_names = self._scan_predefined_files()
# Deduplicate tools by name
unique_tools = {}
for tool in all_tools:
if tool["name"] not in unique_tools:
unique_tools[tool["name"]] = tool
if mode == "list_name":
return sorted(list(unique_tools.keys()))
elif mode == "list_spec":
return list(unique_tools.values())
# Original logic for config and type modes
result = {
"categories": {},
"total_categories": 0,
"total_tools": 0,
"mode": mode,
"summary": "",
}
if scan_all:
# Scan all JSON files in data directory recursively
all_tools, all_tool_names = self._scan_all_json_files()
# For config mode with scan_all, organize by file names
if mode == "config":
file_tools_map = {}
for tool in all_tools:
# Get the source file for this tool (we need to track this)
# For now, we'll organize by tool type as a fallback
tool_type = tool.get("type", "Unknown")
if tool_type not in file_tools_map:
file_tools_map[tool_type] = []
file_tools_map[tool_type].append(tool)
for category, tools in file_tools_map.items():
result["categories"][category] = {"count": len(tools)}
else:
# Use predefined tool files (original behavior)
all_tools, all_tool_names = self._scan_predefined_files()
# Read tools from each category file
for category, file_path in self.tool_files.items():
try:
# Read the JSON file for this category
tools_in_category = read_json_list(file_path)
if mode == "config":
tool_names = [tool["name"] for tool in tools_in_category]
result["categories"][category] = {"count": len(tool_names)}
except Exception as e:
warning(
f"Warning: Could not read tools from {category} ({file_path}): {e}"
)
if mode == "config":
result["categories"][category] = {"count": 0}
# If mode is 'type', organize by tool types instead
if mode == "type":
# Deduplicate tools by name first
unique_tools = {}
for tool in all_tools:
if tool["name"] not in unique_tools:
unique_tools[tool["name"]] = tool
# Group by tool type
type_groups = {}
for tool in unique_tools.values():
tool_type = tool.get("type", "Unknown")
if tool_type not in type_groups:
type_groups[tool_type] = []
type_groups[tool_type].append(tool["name"])
# Build result for type mode
for tool_type, tool_names in type_groups.items():
result["categories"][tool_type] = {
"count": len(tool_names),
"tools": sorted(tool_names),
}
# Calculate totals
result["total_categories"] = len(result["categories"])
result["total_tools"] = len(all_tool_names)
# Generate summary information
mode_title = "Config File Categories" if mode == "config" else "Tool Types"
summary_lines = [
"=" * 60,
f"🔧 ToolUniverse Built-in Tools Overview ({mode_title})",
"=" * 60,
f"📊 Total Categories: {result['total_categories']}",
f"🛠️ Total Unique Tools: {result['total_tools']}",
f"📋 Organization Mode: {mode}",
"",
f"📂 {mode_title} Breakdown:",
"-" * 40,
]
# Sort categories by tool count (descending) for better visualization
sorted_categories = sorted(
result["categories"].items(), key=lambda x: x[1]["count"], reverse=True
)
for category, category_info in sorted_categories:
count = category_info["count"]
# Add visual indicators for different tool counts
if count >= 10:
icon = "🟢"
elif count >= 5:
icon = "🟡"
elif count >= 1:
icon = "🟠"
else:
icon = "🔴"
# Format category name to be more readable
if mode == "config":
display_name = category.replace("_", " ").title()
else:
display_name = category
summary_lines.append(f" {icon} {display_name:<35} {count:>3} tools")
# For type mode, optionally show some tool examples
if (
mode == "type"
and "tools" in category_info
and len(category_info["tools"]) <= 5
):
for tool_name in category_info["tools"]:
summary_lines.append(f" └─ {tool_name}")
elif (
mode == "type"
and "tools" in category_info
and len(category_info["tools"]) > 5
):
for tool_name in category_info["tools"][:3]:
summary_lines.append(f" └─ {tool_name}")
summary_lines.append(
f" └─ ... and {len(category_info['tools']) - 3} more"
)
summary_lines.extend(
["-" * 40, "✅ Ready to use! Call load_tools() to initialize.", "=" * 60]
)
result["summary"] = "\n".join(summary_lines)
# Print summary to console directly
print(result["summary"])
return result
def _scan_predefined_files(self):
"""
Scan predefined tool files (original behavior).
Returns:
tuple: (all_tools, all_tool_names) where all_tools is a list of tool configs
and all_tool_names is a set of tool names for deduplication
"""
all_tools = []
all_tool_names = set()
# Read tools from each category file
for category, file_path in self.tool_files.items():
try:
# Read the JSON file for this category
tools_in_category = read_json_list(file_path)
all_tools.extend(tools_in_category)
all_tool_names.update([tool["name"] for tool in tools_in_category])
except Exception as e:
warning(
f"Warning: Could not read tools from {category} ({file_path}): {e}"
)
# Also include remote tools
try:
remote_dir = os.path.join(current_dir, "data", "remote_tools")
if os.path.isdir(remote_dir):
remote_tools = []
for fname in os.listdir(remote_dir):
if not fname.lower().endswith(".json"):
continue
fpath = os.path.join(remote_dir, fname)
try:
tools_in_file = read_json_list(fpath)
if isinstance(tools_in_file, dict):
tools_in_file = list(tools_in_file.values())
if isinstance(tools_in_file, list):
remote_tools.extend(tools_in_file)
except Exception as e:
warning(
f"Warning: Could not read remote tools from {fpath}: {e}"
)
if remote_tools:
all_tools.extend(remote_tools)
all_tool_names.update([tool["name"] for tool in remote_tools])
except Exception as e:
warning(f"Warning: Failed to scan remote tools directory: {e}")
return all_tools, all_tool_names
def _scan_all_json_files(self):
"""
Recursively scan all JSON files in the data directory and its subdirectories.
Returns:
tuple: (all_tools, all_tool_names) where all_tools is a list of tool configs
and all_tool_names is a set of tool names for deduplication
"""
all_tools = []
all_tool_names = set()
# Get the data directory path
data_dir = os.path.join(current_dir, "data")
if not os.path.exists(data_dir):
warning(f"Warning: Data directory not found: {data_dir}")
return all_tools, all_tool_names
# Recursively find all JSON files
json_files = []
for root, _dirs, files in os.walk(data_dir):
for file in files:
if file.lower().endswith(".json"):
json_files.append(os.path.join(root, file))
self.logger.debug(f"Found {len(json_files)} JSON files to scan")
# Read tools from each JSON file
for json_file in json_files:
try:
tools_in_file = read_json_list(json_file)
# Handle different data formats
if isinstance(tools_in_file, dict):
# Convert dict of tools to list of tools
tools_in_file = list(tools_in_file.values())
elif not isinstance(tools_in_file, list):
# Skip files that don't contain tool configurations
continue
# Add tools to our collection
for tool in tools_in_file:
if isinstance(tool, dict) and "name" in tool:
all_tools.append(tool)
all_tool_names.add(tool["name"])
self.logger.debug(f"Loaded {len(tools_in_file)} tools from {json_file}")
except Exception as e:
warning(f"Warning: Could not read tools from {json_file}: {e}")
continue
self.logger.info(
f"Scanned {len(json_files)} JSON files, found {len(all_tools)} tools"
)
return all_tools, all_tool_names
[docs]
def refresh_tool_name_desc(
self,
enable_full_desc=False,
include_names=None,
exclude_names=None,
include_categories=None,
exclude_categories=None,
):
"""
Refresh the tool name and description mappings with optional filtering.
This method rebuilds the internal tool dictionary and generates filtered lists of tool names
and descriptions based on the provided filter criteria.
Args:
enable_full_desc (bool, optional): If True, includes full tool JSON as description.
If False, uses "name: description" format. Defaults to False.
include_names (list, optional): List of tool names to include.
exclude_names (list, optional): List of tool names to exclude.
include_categories (list, optional): List of categories to include.
exclude_categories (list, optional): List of categories to exclude.
Returns:
tuple: A tuple containing (tool_name_list, tool_desc_list) after filtering.
"""
tool_name_list = []
tool_desc_list = []
for tool in self.all_tools:
tool_name_list.append(tool["name"])
if enable_full_desc:
tool_desc_list.append(json.dumps(tool))
else:
tool_desc_list.append(tool["name"] + ": " + tool["description"])
self.all_tool_dict[tool["name"]] = tool
# Apply filtering if any filter argument is provided
if any([include_names, exclude_names, include_categories, exclude_categories]):
tool_name_list, tool_desc_list = self.filter_tool_lists(
tool_name_list,
tool_desc_list,
include_names=include_names,
exclude_names=exclude_names,
include_categories=include_categories,
exclude_categories=exclude_categories,
)
self.logger.debug(
f"Number of tools after refresh and filter: {len(tool_name_list)}"
)
return tool_name_list, tool_desc_list
[docs]
def prepare_one_tool_prompt(self, tool):
"""
Prepare a single tool configuration for prompt usage by filtering to essential keys.
Args:
tool (dict): Tool configuration dictionary.
Returns:
dict: Tool configuration with only essential keys for prompting.
"""
valid_keys = ["name", "description", "parameter", "required"]
tool = copy.deepcopy(tool)
for key in list(tool.keys()):
if key not in valid_keys:
del tool[key]
return tool
[docs]
def prepare_tool_prompts(self, tool_list):
"""
Prepare a list of tool configurations for prompt usage.
Args:
tool_list (list): List of tool configuration dictionaries.
Returns:
list: List of tool configurations with only essential keys for prompting.
"""
copied_list = []
for tool in tool_list:
copied_list.append(self.prepare_one_tool_prompt(tool))
return copied_list
[docs]
def remove_keys(self, tool_list, invalid_keys):
"""
Remove specified keys from a list of tool configurations.
Args:
tool_list (list): List of tool configuration dictionaries.
invalid_keys (list): List of keys to remove from each tool configuration.
Returns:
list: Deep copy of tool list with specified keys removed.
"""
copied_list = copy.deepcopy(tool_list)
for tool in copied_list:
# Create a list of keys to avoid modifying the dictionary during iteration
for key in list(tool.keys()):
if key in invalid_keys:
del tool[key]
return copied_list
[docs]
def prepare_tool_examples(self, tool_list):
"""
Prepare tool configurations for example usage by keeping extended set of keys.
This method is similar to prepare_tool_prompts but includes additional keys
useful for examples and documentation.
Args:
tool_list (list): List of tool configuration dictionaries.
Returns:
list: Deep copy of tool list with only example-relevant keys.
"""
valid_keys = [
"name",
"description",
"parameter",
"required",
"query_schema",
"fields",
"label",
"type",
]
copied_list = copy.deepcopy(tool_list)
for tool in copied_list:
# Create a list of keys to avoid modifying the dictionary during iteration
for key in list(tool.keys()):
if key not in valid_keys:
del tool[key]
return copied_list
[docs]
def get_tool_specification_by_names(self, tool_names, format="default"):
"""
Retrieve tool specifications by their names using tool_specification method.
Args:
tool_names (list): List of tool names to retrieve.
format (str, optional): Output format. Options: 'default', 'openai'.
If 'openai', returns OpenAI function calling format. Defaults to 'default'.
Returns:
list: List of tool specifications for the specified names.
Tools not found will be reported but not included in the result.
"""
picked_tool_list = []
for each_name in tool_names:
tool_spec = self.tool_specification(each_name, format=format)
if tool_spec:
picked_tool_list.append(tool_spec)
return picked_tool_list
[docs]
def get_tool_by_name(self, tool_names, format="default"):
"""
Retrieve tool configurations by their names.
Args:
tool_names (list): List of tool names to retrieve.
format (str, optional): Output format. Options: 'default', 'openai'.
If 'openai', returns OpenAI function calling format. Defaults to 'default'.
Returns:
list: List of tool configurations for the specified names.
Tools not found will be reported but not included in the result.
"""
return self.get_tool_specification_by_names(tool_names, format=format)
[docs]
def get_one_tool_by_one_name(self, tool_name, return_prompt=True):
"""
Retrieve a single tool specification by name, optionally prepared for prompting.
This is a convenience method that calls get_one_tool_by_one_name.
Args:
tool_name (str): Name of the tool to retrieve.
return_prompt (bool, optional): If True, returns tool prepared for prompting.
If False, returns full tool configuration. Defaults to True.
Returns:
dict or None: Tool configuration if found, None otherwise.
"""
warning(
"The 'get_one_tool_by_one_name' method is deprecated and will be removed in a future version. "
"Please use 'tool_specification' instead."
)
return self.tool_specification(tool_name, return_prompt=return_prompt)
[docs]
def tool_specification(self, tool_name, return_prompt=False, format="default"):
"""
Retrieve a single tool configuration by name.
Args:
tool_name (str): Name of the tool to retrieve.
return_prompt (bool, optional): If True, returns tool prepared for prompting.
If False, returns full tool configuration. Defaults to False.
format (str, optional): Output format. Options: 'default', 'openai'.
If 'openai', returns OpenAI function calling format. Defaults to 'default'.
Returns:
dict or None: Tool configuration if found, None otherwise.
"""
if tool_name in self.all_tool_dict:
tool_config = self.all_tool_dict[tool_name]
if format == "openai":
parameters = tool_config.get("parameter", {})
if isinstance(parameters, dict):
# 修复 required 字段格式
if "properties" in parameters:
for _prop_name, prop_config in parameters["properties"].items():
if (
isinstance(prop_config, dict)
and "required" in prop_config
):
del prop_config["required"]
if "required" in parameters and not isinstance(
parameters["required"], list
):
if parameters["required"] is True:
required_list = []
if "properties" in parameters:
for prop_name, prop_config in parameters[
"properties"
].items():
if (
isinstance(prop_config, dict)
and prop_config.get("required") is True
):
required_list.append(prop_name)
parameters["required"] = required_list
else:
parameters["required"] = []
return {
"name": tool_config["name"],
"description": tool_config["description"],
"parameters": parameters,
}
elif return_prompt:
return self.prepare_one_tool_prompt(tool_config)
else:
return tool_config
else:
warning(f"Tool name {tool_name} not found in the loaded tools.")
return None
[docs]
def get_tool_description(self, tool_name):
"""
Get the description of a tool by its name.
This is a convenience method that calls get_one_tool_by_one_name.
Args:
tool_name (str): Name of the tool.
Returns:
dict or None: Tool configuration if found, None otherwise.
"""
return self.get_one_tool_by_one_name(tool_name)
[docs]
def get_tool_type_by_name(self, tool_name):
"""
Get the type of a tool by its name.
Args:
tool_name (str): Name of the tool.
Returns:
str: The type of the tool.
Raises:
KeyError: If the tool name is not found in loaded tools.
"""
return self.all_tool_dict[tool_name]["type"]
[docs]
def tool_to_str(self, tool_list):
"""
Convert a list of tool configurations to a formatted string.
Args:
tool_list (list): List of tool configuration dictionaries.
Returns:
str: JSON-formatted string representation of the tools, with each tool
separated by double newlines.
"""
return "\n\n".join(json.dumps(obj, indent=4) for obj in tool_list)
[docs]
def extract_function_call_json(
self, lst, return_message=False, verbose=True, format="llama"
):
"""
Extract function call JSON from input data.
This method delegates to the utility function extract_function_call_json.
Args:
lst: Input data containing function call information.
return_message (bool, optional): Whether to return message along with JSON. Defaults to False.
verbose (bool, optional): Whether to enable verbose output. Defaults to True.
format (str, optional): Format type for extraction. Defaults to 'llama'.
Returns:
dict or tuple: Function call JSON, optionally with message if return_message is True.
"""
return extract_function_call_json(
lst, return_message=return_message, verbose=verbose, format=format
)
[docs]
def call_id_gen(self):
"""
Generate a random call ID for function calls.
Returns:
str: A random 9-character string composed of letters and digits.
"""
return "".join(random.choices(string.ascii_letters + string.digits, k=9))
[docs]
def run(self, fcall_str, return_message=False, verbose=True, format="llama"):
"""
Execute function calls from input string or data.
This method parses function call data, validates it, and executes the corresponding tools.
It supports both single function calls and multiple function calls in a list.
Args:
fcall_str: Input string or data containing function call information.
return_message (bool, optional): Whether to return formatted messages. Defaults to False.
verbose (bool, optional): Whether to enable verbose output. Defaults to True.
format (str, optional): Format type for parsing. Defaults to 'llama'.
Returns:
list or str or None:
- For multiple function calls: List of formatted messages with tool responses
- For single function call: Direct result from the tool
- None: If the input is not a valid function call
"""
if return_message:
function_call_json, message = self.extract_function_call_json(
fcall_str, return_message=return_message, verbose=verbose, format=format
)
else:
function_call_json = self.extract_function_call_json(
fcall_str, return_message=return_message, verbose=verbose, format=format
)
message = "" # Initialize message for cases where return_message=False
if function_call_json is not None:
if isinstance(function_call_json, list):
# return the function call+result message with call id.
call_results = []
for i in range(len(function_call_json)):
call_result = self.run_one_function(function_call_json[i])
call_id = self.call_id_gen()
function_call_json[i]["call_id"] = call_id
call_results.append(
{
"role": "tool",
"content": json.dumps(
{"content": call_result, "call_id": call_id}
),
}
)
revised_messages = [
{
"role": "assistant",
"content": message,
"tool_calls": json.dumps(function_call_json),
}
] + call_results
return revised_messages
else:
return self.run_one_function(function_call_json)
else:
error("Not a function call")
return None
[docs]
def run_one_function(self, function_call_json):
"""
Execute a single function call.
This method validates the function call, initializes the tool if necessary,
and executes it with the provided arguments. If hooks are enabled, it also
applies output hooks to process the result.
Args:
function_call_json (dict): Dictionary containing function name and arguments.
Returns:
str or dict: Result from the tool execution, or error message if validation fails.
"""
check_status, check_message = self.check_function_call(function_call_json)
if check_status is False:
return (
"Invalid function call: " + check_message
) # + " You must correct your invalid function call!"
function_name = function_call_json["name"]
arguments = function_call_json["arguments"]
# Execute the tool
if function_name in self.callable_functions:
result = self.callable_functions[function_name].run(arguments)
else:
if function_name in self.all_tool_dict:
self.logger.debug(
"Initiating callable_function from loaded tool dicts."
)
tool = self.init_tool(
self.all_tool_dict[function_name], add_to_cache=True
)
result = tool.run(arguments)
else:
return f"Tool '{function_name}' not found"
# Apply output hooks if enabled
if self.hook_manager:
context = {
"tool_name": function_name,
"tool_type": (
tool.__class__.__name__ if "tool" in locals() else "unknown"
),
"execution_time": time.time(),
"arguments": arguments,
}
result = self.hook_manager.apply_hooks(
result, function_name, arguments, context
)
return result
[docs]
def toggle_hooks(self, enabled: bool):
"""
Enable or disable output hooks globally.
This method allows runtime control of the hook system. When enabled,
it initializes the HookManager if not already present. When disabled,
it deactivates the HookManager.
Args:
enabled (bool): True to enable hooks, False to disable
"""
self.hooks_enabled = enabled
if enabled and not self.hook_manager:
self.hook_manager = HookManager({}, self)
self.logger.info("Output hooks enabled")
elif self.hook_manager:
self.hook_manager.toggle_hooks(enabled)
self.logger.info(f"Output hooks {'enabled' if enabled else 'disabled'}")
else:
self.logger.debug("Output hooks disabled")
[docs]
def init_tool(self, tool=None, tool_name=None, add_to_cache=True):
"""
Initialize a tool instance from configuration or name.
This method creates a new tool instance using the tool type mappings and
optionally caches it for future use. It handles special cases like the
OpentargetToolDrugNameMatch which requires additional dependencies.
Args:
tool (dict, optional): Tool configuration dictionary. Either this or tool_name must be provided.
tool_name (str, optional): Name of the tool type to initialize. Either this or tool must be provided.
add_to_cache (bool, optional): Whether to cache the initialized tool. Defaults to True.
Returns:
object: Initialized tool instance.
Raises:
KeyError: If the tool type is not found in tool_type_mappings.
"""
global tool_type_mappings
if tool_name is not None:
# Use lazy loading to get the tool class
tool_class = get_tool_class_lazy(tool_name)
if tool_class is None:
raise KeyError(f"Tool type '{tool_name}' not found in registry")
new_tool = tool_class()
else:
tool_type = tool["type"]
tool_name = tool["name"]
# Use lazy loading to get the tool class
tool_class = get_tool_class_lazy(tool_type)
if tool_class is None:
# Fallback to old method if lazy loading fails
if tool_type not in tool_type_mappings:
# Refresh registry and try again
tool_type_mappings = get_tool_registry()
if tool_type not in tool_type_mappings:
raise KeyError(f"Tool type '{tool_type}' not found in registry")
tool_class = tool_type_mappings[tool_type]
if "OpentargetToolDrugNameMatch" == tool_type:
if "FDADrugLabelGetDrugGenericNameTool" not in self.callable_functions:
drug_tool_class = get_tool_class_lazy(
"FDADrugLabelGetDrugGenericNameTool"
)
if drug_tool_class is None:
drug_tool_class = tool_type_mappings[
"FDADrugLabelGetDrugGenericNameTool"
]
self.callable_functions["FDADrugLabelGetDrugGenericNameTool"] = (
drug_tool_class()
)
new_tool = tool_class(
tool_config=tool,
drug_generic_tool=self.callable_functions[
"FDADrugLabelGetDrugGenericNameTool"
],
)
elif "ToolFinderEmbedding" == tool_type:
new_tool = tool_class(tool_config=tool, tooluniverse=self)
elif "ComposeTool" == tool_type:
new_tool = tool_class(tool_config=tool, tooluniverse=self)
elif "ToolFinderLLM" == tool_type:
new_tool = tool_class(tool_config=tool, tooluniverse=self)
elif "ToolFinderKeyword" == tool_type:
new_tool = tool_class(tool_config=tool, tooluniverse=self)
else:
new_tool = tool_class(tool_config=tool)
if add_to_cache:
self.callable_functions[tool_name] = new_tool
return new_tool
[docs]
def check_function_call(self, fcall_str, function_config=None, format="llama"):
"""
Validate a function call against tool configuration.
This method checks if a function call is valid by verifying the function name
exists and the arguments match the expected parameters.
Args:
fcall_str: Function call string or data to validate.
function_config (dict, optional): Specific function configuration to validate against.
If None, uses the loaded tool configuration.
format (str, optional): Format type for parsing. Defaults to 'llama'.
Returns:
tuple: A tuple of (is_valid, message) where:
- is_valid (bool): True if the function call is valid, False otherwise
- message (str): Error message if invalid, empty if valid
"""
function_call_json = self.extract_function_call_json(fcall_str, format=format)
if function_call_json is not None:
if function_config is not None:
return evaluate_function_call(function_config, function_call_json)
function_name = function_call_json["name"]
if function_name not in self.all_tool_dict:
return (
False,
f"Function name {function_name} not found in loaded tools.",
)
return evaluate_function_call(
self.all_tool_dict[function_name], function_call_json
)
else:
return False, "Invalid JSON string of function call"
[docs]
def export_tool_names(self, output_file, category_filter=None):
"""
Export tool names to a text file (one per line).
Args:
output_file (str): Path to the output file
category_filter (list, optional): List of categories to filter by
"""
try:
tools_to_export = []
if category_filter:
# Filter by categories
for category in category_filter:
if category in self.tool_category_dicts:
tools_to_export.extend(
[
tool["name"]
for tool in self.tool_category_dicts[category]
]
)
else:
# Export all tools
tools_to_export = [tool["name"] for tool in self.all_tools]
# Remove duplicates and sort
tools_to_export = sorted(set(tools_to_export))
with open(output_file, "w", encoding="utf-8") as f:
f.write("# ToolUniverse Tool Names\n")
f.write(f"# Generated automatically - {len(tools_to_export)} tools\n")
if category_filter:
f.write(f"# Categories: {', '.join(category_filter)}\n")
f.write("\n")
for tool_name in tools_to_export:
f.write(f"{tool_name}\n")
self.logger.info(
f"Exported {len(tools_to_export)} tool names to {output_file}"
)
return tools_to_export
except Exception as e:
self.logger.error(f"Error exporting tool names to {output_file}: {e}")
return []
[docs]
def get_available_tools(self, category_filter=None, name_only=True):
"""
Get available tools, optionally filtered by category.
Args:
category_filter (list, optional): List of categories to filter by
name_only (bool): If True, return only tool names; if False, return full configs
Returns:
list: List of tool names or tool configurations
"""
if not hasattr(self, "all_tools") or not self.all_tools:
self.logger.warning("No tools loaded. Call load_tools() first.")
return []
if category_filter:
filtered_tools = []
for tool in self.all_tools:
tool_type = tool.get("type", "")
if tool_type in category_filter:
filtered_tools.append(tool)
else:
filtered_tools = self.all_tools
if name_only:
return [tool["name"] for tool in filtered_tools]
else:
return filtered_tools
[docs]
def find_tools_by_pattern(self, pattern, search_in="name", case_sensitive=False):
"""
Find tools matching a pattern in their name or description.
Args:
pattern (str): Pattern to search for
search_in (str): Where to search - 'name', 'description', or 'both'
case_sensitive (bool): Whether search should be case sensitive
Returns:
list: List of matching tool configurations
"""
if not hasattr(self, "all_tools") or not self.all_tools:
self.logger.warning("No tools loaded. Call load_tools() first.")
return []
import re
flags = 0 if case_sensitive else re.IGNORECASE
matching_tools = []
for tool in self.all_tools:
found = False
if search_in in ["name", "both"]:
tool_name = tool.get("name", "")
if re.search(pattern, tool_name, flags):
found = True
if search_in in ["description", "both"] and not found:
tool_desc = tool.get("description", "")
if re.search(pattern, tool_desc, flags):
found = True
if found:
matching_tools.append(tool)
self.logger.info(
f"Found {len(matching_tools)} tools matching pattern '{pattern}'"
)
return matching_tools
[docs]
def load_tools_from_names_list(self, tool_names, clear_existing=True):
"""
Load only specific tools by their names.
Args:
tool_names (list): List of tool names to load
clear_existing (bool): Whether to clear existing tools first
Returns:
int: Number of tools successfully loaded
"""
if clear_existing:
self.all_tools = []
self.all_tool_dict = {}
self.tool_category_dicts = {}
# Use the enhanced load_tools method
original_count = len(self.all_tools)
self.load_tools(include_tools=tool_names)
return len(self.all_tools) - original_count