"""
Tool Metadata Generation Pipeline
Generates comprehensive metadata for a list of tools by extracting details from their configuration files
"""
[docs]
def compose(arguments, tooluniverse, call_tool):
"""
Main composition function for Tool Metadata Generation
Args:
arguments (dict): Input arguments containing a list of tool config JSONs as well as a tool_type_mappings dict for non-API tools (e.g., {'Databases': ['XMLTool']})
tooluniverse: ToolUniverse instance
call_tool: Function to call other tools
Returns:
list: List of tool metadata dictionaries (JSON-compatible)
"""
import json
import warnings
import uuid
from collections import Counter
def _parse_agent_output(output, tool_name="Unknown Tool"):
"""Helper to parse varied agent outputs (JSON string, wrapped dict) into a dict."""
if isinstance(output, str):
try:
return json.loads(output)
except json.JSONDecodeError:
print(
f"Failed to parse JSON string from {tool_name}; received: {output[:200]}"
)
return {} # Return empty dict on failure to prevent crash
if isinstance(output, dict) and "success" in output and "result" in output:
# Handle wrapped output like {'success': True, 'result': '{...}'}
inner_result = output.get("result")
if isinstance(inner_result, str) and inner_result.strip():
try:
return json.loads(inner_result)
except json.JSONDecodeError:
print(
f"Failed to parse inner result JSON from {tool_name}; using empty metadata."
)
return {}
elif isinstance(inner_result, dict):
return inner_result # Result is already a dict
else:
return {} # No valid inner result
return {}
DEFAULT_TOOL_TYPE_MAPPINGS = {
"Embedding Store": ["EmbeddingDatabase"],
"Database": ["XMLTool", "DatasetTool"],
"Scientific Software Package": ["PackageTool"],
"AI Agent": ["AgenticTool"],
"ML Model": [
"ADMETAITool",
"AlphaFoldRESTTool",
"boltz2_docking",
"compute_depmap24q2_gene_correlations",
"run_compass_prediction",
"run_pinnacle_ppi_retrieval",
"run_transcriptformer_embedding_retrieval",
"get_abstract_from_patent_app_number",
"get_claims_from_patent_app_number",
"get_full_text_from_patent_app_number",
],
"Human Expert Feedback": [
"mcp_auto_loader_human_expert",
"consult_human_expert",
"get_expert_response",
"get_expert_status",
"list_pending_expert_requests",
"submit_expert_response",
],
"MCP": ["MCPAutoLoaderTool", "MCPClientTool", "MCPProxyTool"],
"Compositional Tool": ["ComposeTool"],
"Tool Finder Tool": [
"ToolFinderEmbedding",
"ToolFinderLLM",
"ToolFinderKeyword",
],
"Special Tool": ["Finish", "CallAgent"],
}
# Step 0: Parse inputs and set up variables
tool_configs = arguments.get("tool_configs", [])
tool_type_mappings = arguments.get("tool_type_mappings", {})
add_existing_tooluniverse_labels = arguments.get(
"add_existing_tooluniverse_labels", True
)
max_new_tooluniverse_labels = arguments.get("max_new_tooluniverse_labels", 0)
# Merge tool type mappings with defaults, prioritizing user-provided mappings
for key, value in DEFAULT_TOOL_TYPE_MAPPINGS.items():
if key not in tool_type_mappings:
tool_type_mappings[key] = value
warnings.warn(
"Warning: Augmenting your provided tool_type_mappings with default tool_type_mappings to ensure compatibility with existing ToolUniverse tools. The default tool_type_mappings are:\n"
+ json.dumps(DEFAULT_TOOL_TYPE_MAPPINGS, indent=4),
stacklevel=2,
)
# Add existing ToolUniverse labels if specified
tool_labels_set = set()
if add_existing_tooluniverse_labels:
# Load existing standardized tool metadata (list of dicts each containing a 'tags' field)
# Use importlib.resources to avoid absolute paths so this works inside the installed package.
try:
try:
from importlib import resources as importlib_resources # Py3.9+
except ImportError: # pragma: no cover
import importlib_resources # type: ignore
# Access the JSON file inside the package (tooluniverse/website_data/v3_standardized_tags.json)
json_path = importlib_resources.files("tooluniverse.website_data").joinpath(
"v3_standardized_tags.json"
)
with json_path.open("r", encoding="utf-8") as f:
existing_metadata_list = json.load(f)
if isinstance(existing_metadata_list, list):
for item in existing_metadata_list:
if isinstance(item, dict):
tags = item.get("tags", [])
if isinstance(tags, list):
for tag in tags:
if isinstance(tag, str) and tag.strip():
tool_labels_set.add(tag.strip())
except (
Exception
) as e: # Fail gracefully; downstream logic will just proceed without enrichment
print(f"Failed to load existing ToolUniverse labels: {e}")
if not tool_configs:
return []
# Step 1: Generate detailed metadata for each tool
all_tool_metadata = []
for tool_config in tool_configs:
tool_config_str = json.dumps(tool_config)
try:
metadata_params = {
"tool_config": tool_config_str,
"tool_type_mappings": tool_type_mappings,
}
generated_metadata = {}
for _ in range(5): # Retry up to 5 times
raw_output = call_tool("ToolMetadataGenerator", metadata_params)
generated_metadata = _parse_agent_output(
raw_output, "ToolMetadataGenerator"
)
if generated_metadata: # If the result is not empty, break
break
# Attempt to enrich tags using LabelGenerator if tags are empty or default
try:
# Prepare inputs for LabelGenerator
tool_name = (
tool_config.get("name") or generated_metadata.get("name") or ""
)
tool_description = (
tool_config.get("description")
or generated_metadata.get("description")
or ""
)
# The parameter schema may be nested under parameter->properties
param_properties = tool_config.get("parameter", {}).get(
"properties", {}
)
# Convert parameters to a JSON-like string representation (without importing json to keep dependencies minimal)
# Safe string construction
def _stringify_params(props):
parts = []
for k, v in props.items():
if isinstance(v, dict):
type_val = v.get("type", "unknown")
desc_val = v.get("description", "")
parts.append(
f"\"{k}\": {{ 'type': '{type_val}', 'description': '{desc_val}' }}"
)
else:
parts.append(f'"{k}": ' + repr(v))
return "{" + ", ".join(parts) + "}"
tool_parameters_str = _stringify_params(param_properties)
category = (
tool_config.get("category")
or tool_config.get("type")
or generated_metadata.get("category")
or ""
)
label_params = {
"tool_name": tool_name,
"tool_description": tool_description,
"tool_parameters": tool_parameters_str,
"category": category,
"existing_labels": json.dumps(list(tool_labels_set)),
}
label_result = call_tool("LabelGenerator", label_params)
label_result = _parse_agent_output(label_result, "LabelGenerator")
# Parse label_result which may be dict or JSON string
labels = []
if isinstance(label_result, dict):
labels = label_result.get("labels", [])
# Replace tags
if labels:
generated_metadata["tags"] = labels
except Exception as tag_exc:
print(
f"Label generation failed for tool {tool_config.get('name', 'N/A')}: {tag_exc}"
)
all_tool_metadata.append(generated_metadata)
except Exception as e:
print(
f"Failed to generate metadata for tool {tool_config.get('name', 'N/A')}: {e}"
)
# Optionally, append an error object or skip the tool
all_tool_metadata.append(
{
"error": f"Metadata generation failed for {tool_config.get('name', 'N/A')}",
"details": str(e),
}
)
# Step 2: Validate schema
validated_metadata = []
schema_template = {
"id": "",
"name": "",
"description": "",
"detailed_description": "",
"toolType": "api",
"tags": [],
"category": "",
"lab": "",
"source": "",
"version": "v1.0.0",
"reviewed": False,
"isValidated": False,
"usageStats": "0 uses",
"capabilities": [],
"limitations": [],
"parameters": {},
"inputSchema": {},
"exampleInput": {},
"apiEndpoints": [],
}
for metadata in all_tool_metadata:
if "error" in metadata:
validated_metadata.append(metadata)
continue
validated_item = {}
for key, default_value in schema_template.items():
value = metadata.get(key, default_value)
if not isinstance(value, type(default_value)):
# Attempt to gracefully handle simple type mismatches or reset
if isinstance(default_value, list) and not isinstance(value, list):
value = []
elif isinstance(default_value, dict) and not isinstance(value, dict):
value = {}
elif isinstance(default_value, str) and not isinstance(value, str):
value = str(value) if value is not None else ""
elif isinstance(default_value, bool) and not isinstance(value, bool):
value = bool(value)
else:
value = default_value # Fallback to default if type is complex/unexpected
validated_item[key] = value
validated_metadata.append(validated_item)
all_tool_metadata = validated_metadata
# Step 3: Standardize sources and tags using ToolMetadataStandardizer
try:
source_list = []
for tool in all_tool_metadata:
if "error" not in tool and tool.get("source"):
source_list.append(tool.get("source"))
# Standardize sources
if source_list:
standardizer_params = {"metadata_list": list(set(source_list))}
standardized_sources_map = call_tool(
"ToolMetadataStandardizer", standardizer_params
)
standardized_sources_map = _parse_agent_output(
standardized_sources_map, "ToolMetadataStandardizer"
)
print("Standardized sources mapping:", standardized_sources_map)
# Create a reverse map for easy lookup
source_to_standard_map = {}
for standard_name, raw_names in standardized_sources_map.items():
for raw_name in raw_names:
source_to_standard_map[raw_name] = standard_name
# Update the source in each metadata object
for tool_metadata in all_tool_metadata:
if "error" not in tool_metadata:
original_source = tool_metadata.get("source")
if original_source in source_to_standard_map:
tool_metadata["source"] = source_to_standard_map[
original_source
]
except Exception as e:
print(f"An error occurred during source standardization: {e}")
try:
# Step 4: Standardize tags, with an optional second pass to meet label limits
all_raw_tags = []
for tool in all_tool_metadata:
if "error" not in tool and isinstance(tool.get("tags"), list):
all_raw_tags.extend(tool.get("tags", []))
# Filter out existing labels before standardization
tags_to_standardize = [
tag for tag in set(all_raw_tags) if tag not in tool_labels_set
]
if max_new_tooluniverse_labels <= 0:
# If no new labels are allowed, skip standardization and just remove new tags
for tool_metadata in all_tool_metadata:
if "error" not in tool_metadata and isinstance(
tool_metadata.get("tags"), list
):
original_tags = tool_metadata.get("tags", [])
filtered_tags = [
tag for tag in original_tags if tag in tool_labels_set
]
tool_metadata["tags"] = sorted(list(set(filtered_tags)))
return (
all_tool_metadata # Return early since no further processing is needed
)
tag_to_standard_map = {}
if tags_to_standardize:
# Iteratively standardize tags for up to 5 passes to meet the label limit.
current_tags_to_standardize = list(set(tags_to_standardize))
# This map will store the final standardized version for each original raw tag.
tag_to_standard_map = {tag: tag for tag in tags_to_standardize}
for i in range(5): # Loop for up to 5 standardization passes
num_tags = len(current_tags_to_standardize)
# If the number of tags is within the limit, no more standardization is needed.
if (
max_new_tooluniverse_labels > 0
and num_tags <= max_new_tooluniverse_labels
):
print(
f"Tag count ({num_tags}) is within the limit ({max_new_tooluniverse_labels}). Stopping standardization."
)
break
print(f"Pass {i+1}: Standardizing {num_tags} tags.")
# Set the limit for the standardizer tool.
# Use a default high limit if max_new_tooluniverse_labels is not set, otherwise use the specified limit.
limit = (
max_new_tooluniverse_labels
if max_new_tooluniverse_labels > 0
else 150
)
standardizer_params = {
"metadata_list": current_tags_to_standardize,
"limit": limit,
}
print(f"Pass {i+1} input tags: ", current_tags_to_standardize)
# Call the standardizer tool and parse the output, with retries.
pass_output_map = {}
for _ in range(5): # Retry up to 5 times
raw_output = call_tool(
"ToolMetadataStandardizer", standardizer_params
)
pass_output_map = _parse_agent_output(
raw_output, "ToolMetadataStandardizer"
)
if pass_output_map: # If the result is not empty, break
break
print(f"Pass {i+1} standardized tags mapping:", pass_output_map)
# Create a reverse map for the current pass for easy lookup.
# Maps a tag from the input list to its new standardized version.
pass_reverse_map = {}
for standard_tag, raw_tags_in_pass in pass_output_map.items():
for raw_tag in raw_tags_in_pass:
pass_reverse_map[raw_tag] = standard_tag
# Update the final mapping by chaining the new standardization.
# For each original tag, find its current mapping and see if it was further standardized in this pass.
for original_tag, current_standard_tag in tag_to_standard_map.items():
# If the current standard tag was part of this pass's input and got re-mapped, update it.
if current_standard_tag in pass_reverse_map:
tag_to_standard_map[original_tag] = pass_reverse_map[
current_standard_tag
]
# The new set of tags for the next pass are the keys of the current pass's output.
current_tags_to_standardize = sorted(list(pass_output_map.keys()))
# If the standardizer returns an empty map, it means no further consolidation is possible.
if not current_tags_to_standardize:
print("No further tag consolidation possible. Stopping.")
break
# Update tags in each metadata object using the final mapping
for tool_metadata in all_tool_metadata:
if "error" not in tool_metadata and isinstance(
tool_metadata.get("tags"), list
):
original_tags = tool_metadata.get("tags", [])
# For each original tag, use its standardized version if available, otherwise keep the original.
# This correctly handles tags that were already in tool_labels_set and thus not standardized.
standardized_tags = {
tag_to_standard_map.get(tag, tag) for tag in original_tags
}
tool_metadata["tags"] = sorted(list(standardized_tags))
except Exception as e:
print(f"An error occurred during tag standardization: {e}")
# Step 5: Remove tags that occur only once across the entire dataset,
# but only for tags that are new (not pre-existing in tooluniverse)
try:
# Flatten the list of all new tags from all tools, ignoring error entries
all_new_tags_flat = [
tag
for tool_metadata in all_tool_metadata
if "error" not in tool_metadata
and isinstance(tool_metadata.get("tags"), list)
for tag in tool_metadata.get("tags", [])
if tag not in tool_labels_set
]
if all_new_tags_flat:
# Count the frequency of each new tag
new_tag_counts = Counter(all_new_tags_flat)
# Identify new tags that appear more than once
new_tags_to_keep = {
tag for tag, count in new_tag_counts.items() if count > 1
}
# Filter the tags in each tool's metadata
for tool_metadata in all_tool_metadata:
if "error" not in tool_metadata and isinstance(
tool_metadata.get("tags"), list
):
original_tags = tool_metadata.get("tags", [])
# Keep all pre-existing tags, and only new tags that appear more than once
filtered_tags = [
tag
for tag in original_tags
if tag in tool_labels_set or tag in new_tags_to_keep
]
tool_metadata["tags"] = sorted(list(set(filtered_tags)))
except Exception as e:
print(f"An error occurred during single-occurrence tag removal: {e}")
# Step 6: Manually set the UUID 'id' field to ensure true randomness
for tool_metadata in all_tool_metadata:
if "error" not in tool_metadata:
tool_metadata["id"] = str(uuid.uuid4())
return all_tool_metadata