Source code for tooluniverse.tool_finder_keyword

"""
Keyword-based Tool Finder - An advanced keyword search tool for finding relevant tools.

This tool provides sophisticated keyword matching functionality using natural language
processing techniques including tokenization, stop word removal, stemming, and TF-IDF
scoring for improved relevance ranking. It serves as a robust search method when
AI-powered search methods are unavailable.
"""

import json
import re
import math
from collections import Counter, defaultdict
from typing import Dict, List
from .base_tool import BaseTool
from .tool_registry import register_tool


[docs] @register_tool("ToolFinderKeyword") class ToolFinderKeyword(BaseTool): """ Advanced keyword-based tool finder that uses sophisticated text processing and TF-IDF scoring. This class implements natural language processing techniques for tool discovery including: - Tokenization and normalization - Stop word removal - Basic stemming - TF-IDF relevance scoring - Semantic phrase matching The search operates by parsing user queries to extract key terms, processing them through NLP pipelines, and matching against pre-built indices of tool metadata for efficient and relevant tool discovery. """ # Common English stop words to filter out STOP_WORDS = { "a", "an", "and", "are", "as", "at", "be", "by", "for", "from", "has", "he", "in", "is", "it", "its", "of", "on", "that", "to", "was", "will", "with", "the", "this", "but", "they", "have", "had", "what", "said", "each", "which", "their", "time", "up", "use", "your", "how", "all", "any", "can", "do", "get", "if", "may", "new", "now", "old", "see", "two", "way", "who", "boy", "did", "number", "no", "find", "long", "down", "day", "came", "made", "part", } # Simple stemming rules for common suffixes STEMMING_RULES = [ ("ies", "y"), ("ied", "y"), ("ying", "y"), ("ing", ""), ("ly", ""), ("ed", ""), ("ies", "y"), ("ier", "y"), ("iest", "y"), ("s", ""), ("es", ""), ("er", ""), ("est", ""), ("tion", "t"), ("sion", "s"), ("ness", ""), ("ment", ""), ("able", ""), ("ible", ""), ("ful", ""), ("less", ""), ("ous", ""), ("ive", ""), ("al", ""), ("ic", ""), ("ize", ""), ("ise", ""), ("ate", ""), ("fy", ""), ("ify", ""), ]
[docs] def __init__(self, tool_config, tooluniverse=None): """ Initialize the Advanced Keyword-based Tool Finder. Args: tool_config (dict): Configuration dictionary for the tool tooluniverse: Reference to the ToolUniverse instance containing all tools """ super().__init__(tool_config) self.tooluniverse = tooluniverse # Extract configuration self.name = tool_config.get("name", "ToolFinderKeyword") self.description = tool_config.get( "description", "Advanced keyword-based tool finder" ) # Tool filtering settings self.exclude_tools = tool_config.get( "exclude_tools", tool_config.get("configs", {}).get( "exclude_tools", [ "Tool_RAG", "Tool_Finder", "Finish", "CallAgent", "ToolFinderLLM", "ToolFinderKeyword", ], ), ) self.include_categories = tool_config.get("include_categories", None) self.exclude_categories = tool_config.get("exclude_categories", None) # Initialize tool index for TF-IDF scoring self._tool_index = None self._document_frequencies = None self._total_documents = 0
def _tokenize_and_normalize(self, text: str) -> List[str]: """ Tokenize text and apply normalization including stop word removal and stemming. Args: text (str): Input text to tokenize Returns: List[str]: List of processed tokens """ if not text: return [] # Convert to lowercase and extract words (alphanumeric sequences) tokens = re.findall(r"\b[a-zA-Z][a-zA-Z0-9]*\b", text.lower()) # Remove stop words tokens = [token for token in tokens if token not in self.STOP_WORDS] # Apply basic stemming stemmed_tokens = [] for token in tokens: stemmed = self._apply_stemming(token) if len(stemmed) > 2: # Keep only meaningful terms stemmed_tokens.append(stemmed) return stemmed_tokens def _apply_stemming(self, word: str) -> str: """ Apply basic stemming rules to reduce words to their root form. Args: word (str): Word to stem Returns: str: Stemmed word """ if len(word) <= 3: return word for suffix, replacement in self.STEMMING_RULES: if word.endswith(suffix) and len(word) > len(suffix) + 2: return word[: -len(suffix)] + replacement return word def _extract_phrases( self, tokens: List[str], max_phrase_length: int = 3 ) -> List[str]: """ Extract meaningful phrases from tokens for better semantic matching. Args: tokens (List[str]): Tokenized words max_phrase_length (int): Maximum length of phrases to extract Returns: List[str]: List of phrases and individual tokens """ phrases = [] # Add individual tokens phrases.extend(tokens) # Add bigrams and trigrams for length in range(2, min(max_phrase_length + 1, len(tokens) + 1)): for i in range(len(tokens) - length + 1): phrase = " ".join(tokens[i : i + length]) phrases.append(phrase) return phrases def _build_tool_index(self, tools: List[Dict]) -> None: """ Build TF-IDF index for all tools to enable efficient relevance scoring. Args: tools (List[Dict]): List of tool configurations """ self._tool_index = {} term_doc_count = defaultdict(int) self._total_documents = 0 for tool in tools: tool_name = tool.get("name", "") if tool_name in self.exclude_tools: continue # Combine tool metadata for indexing searchable_text = " ".join( [ tool.get("name", ""), tool.get("description", ""), tool.get("type", ""), tool.get("category", ""), # Include parameter names and descriptions " ".join(self._extract_parameter_text(tool.get("parameter", {}))), ] ) # Tokenize and extract phrases tokens = self._tokenize_and_normalize(searchable_text) phrases = self._extract_phrases(tokens) # Build term frequency map for this tool term_freq = Counter(phrases) self._tool_index[tool_name] = { "tool": tool, "terms": term_freq, "total_terms": len(phrases), } # Count document frequency for each term unique_terms = set(phrases) for term in unique_terms: term_doc_count[term] += 1 self._total_documents += 1 # Calculate document frequencies self._document_frequencies = dict(term_doc_count) def _extract_parameter_text(self, parameter_schema: Dict) -> List[str]: """ Extract searchable text from parameter schema. Args: parameter_schema (Dict): Tool parameter schema Returns: List[str]: List of text elements from parameters """ text_elements = [] if isinstance(parameter_schema, dict): properties = parameter_schema.get("properties", {}) for prop_name, prop_info in properties.items(): text_elements.append(prop_name) if isinstance(prop_info, dict): desc = prop_info.get("description", "") if desc: text_elements.append(desc) return text_elements def _calculate_tfidf_score(self, query_terms: List[str], tool_name: str) -> float: """ Calculate TF-IDF relevance score for a tool given query terms. Args: query_terms (List[str]): Processed query terms and phrases tool_name (str): Name of the tool to score Returns: float: TF-IDF relevance score """ if tool_name not in self._tool_index: return 0.0 tool_data = self._tool_index[tool_name] tool_terms = tool_data["terms"] total_terms = tool_data["total_terms"] score = 0.0 query_term_freq = Counter(query_terms) for term, query_freq in query_term_freq.items(): if term in tool_terms: # Term Frequency (TF): frequency of term in tool / total terms in tool tf = tool_terms[term] / total_terms # Inverse Document Frequency (IDF): log(total docs / docs containing term) doc_freq = self._document_frequencies.get(term, 1) idf = math.log(self._total_documents / doc_freq) # TF-IDF score with query term frequency weighting score += tf * idf * math.log(1 + query_freq) return score def _calculate_exact_match_bonus(self, query: str, tool: Dict) -> float: """ Calculate bonus score for exact matches in tool name or key phrases. Args: query (str): Original query string tool (Dict): Tool configuration Returns: float: Exact match bonus score """ query_lower = query.lower() tool_name = tool.get("name", "").lower() tool_desc = tool.get("description", "").lower() bonus = 0.0 # Exact tool name match if query_lower in tool_name or tool_name in query_lower: bonus += 2.0 # Exact phrase matches in description query_words = query_lower.split() if len(query_words) > 1: query_phrase = " ".join(query_words) if query_phrase in tool_desc: bonus += 1.5 # Category or type exact matches tool_type = tool.get("type", "").lower() tool_category = tool.get("category", "").lower() if query_lower in tool_type or query_lower in tool_category: bonus += 1.0 return bonus
[docs] def find_tools( self, message=None, picked_tool_names=None, rag_num=5, return_call_result=False, categories=None, ): """ Find relevant tools based on a message or pre-selected tool names. This method matches the interface of other tool finders to ensure seamless replacement. It uses keyword-based search instead of embedding similarity. Args: message (str, optional): Query message to find tools for. Required if picked_tool_names is None. picked_tool_names (list, optional): Pre-selected tool names to process. Required if message is None. rag_num (int, optional): Number of tools to return after filtering. Defaults to 5. return_call_result (bool, optional): If True, returns both prompts and tool names. Defaults to False. categories (list, optional): List of tool categories to filter by. Returns: str or tuple: - If return_call_result is False: Tool prompts as a formatted string - If return_call_result is True: Tuple of (tool_prompts, tool_names) Raises: AssertionError: If both message and picked_tool_names are None """ if picked_tool_names is None: assert picked_tool_names is not None or message is not None # Use keyword-based tool search (directly call JSON search to avoid recursion) search_result = self._run_json_search( {"description": message, "categories": categories, "limit": rag_num} ) # Parse JSON result to extract tool names try: result_data = json.loads(search_result) if result_data.get("error"): picked_tool_names = [] else: picked_tool_names = [ tool["name"] for tool in result_data.get("tools", []) ] except json.JSONDecodeError: picked_tool_names = [] # Filter out special tools (matching original behavior) picked_tool_names_no_special = [] for tool in picked_tool_names: if tool not in self.exclude_tools: picked_tool_names_no_special.append(tool) picked_tool_names_no_special = picked_tool_names_no_special[:rag_num] picked_tool_names = picked_tool_names_no_special[:rag_num] # Get tool objects and prepare prompts (matching original behavior) picked_tools = self.tooluniverse.get_tool_by_name(picked_tool_names) picked_tools_prompt = self.tooluniverse.prepare_tool_prompts(picked_tools) if return_call_result: return picked_tools_prompt, picked_tool_names return picked_tools_prompt
[docs] def run(self, arguments): """ Find tools using advanced keyword-based search with NLP processing and TF-IDF scoring. This method provides a unified interface compatible with other tool finders. Args: arguments (dict): Dictionary containing: - description (str): Search query string (unified parameter name) - categories (list, optional): List of categories to filter by - limit (int, optional): Maximum number of results to return (default: 10) - picked_tool_names (list, optional): Pre-selected tool names to process - return_call_result (bool, optional): Whether to return both prompts and names. Defaults to False. Returns: str or tuple: - If return_call_result is False: Tool prompts as a formatted string - If return_call_result is True: Tuple of (tool_prompts, tool_names) """ # Extract parameters for compatibility description = arguments.get("description", arguments.get("query", "")) limit = arguments.get("limit", 10) return_call_result = arguments.get("return_call_result", False) categories = arguments.get("categories", None) picked_tool_names = arguments.get("picked_tool_names", None) # If we have a unified interface call, delegate to find_tools method if return_call_result is not None: return self.find_tools( message=description, picked_tool_names=picked_tool_names, rag_num=limit, return_call_result=return_call_result, categories=categories, ) # Otherwise use original JSON-based interface for backward compatibility return self._run_json_search(arguments)
def _run_json_search(self, arguments): """ Original JSON-based search implementation for backward compatibility. Args: arguments (dict): Search arguments Returns: str: JSON string containing search results with relevance scores """ try: # Extract arguments with unified parameter names query = arguments.get( "description", arguments.get("query", "") ) # Support both names for compatibility categories = arguments.get("categories", None) limit = arguments.get("limit", 10) if not query: return json.dumps( { "error": "Description parameter is required", "query": query, "tools": [], }, indent=2, ) # Ensure categories is None or a list (handle validation issue) if categories is not None and not isinstance(categories, list): categories = None # Get all tools from tooluniverse if not self.tooluniverse: return json.dumps( { "error": "ToolUniverse not available", "query": query, "tools": [], }, indent=2, ) all_tools = self.tooluniverse.return_all_loaded_tools() # Filter by categories if specified if categories: filtered_tools = self.tooluniverse.select_tools( include_categories=categories ) else: filtered_tools = all_tools # Build search index if not already built or if tools changed if self._tool_index is None or self._total_documents != len( [ t for t in filtered_tools if t.get("name", "") not in self.exclude_tools ] ): self._build_tool_index(filtered_tools) # Process query using NLP techniques query_tokens = self._tokenize_and_normalize(query) query_phrases = self._extract_phrases(query_tokens) if not query_tokens and not query_phrases: return json.dumps( { "error": "No meaningful search terms found in query", "query": query, "tools": [], }, indent=2, ) # Calculate relevance scores for all tools tool_scores = [] for tool in filtered_tools: tool_name = tool.get("name", "") # Skip excluded tools if tool_name in self.exclude_tools: continue # Apply category filters if specified tool_category = tool.get("category", "unknown") if ( self.include_categories and tool_category not in self.include_categories ): continue if self.exclude_categories and tool_category in self.exclude_categories: continue # Calculate TF-IDF score tfidf_score = self._calculate_tfidf_score(query_phrases, tool_name) # Calculate exact match bonus exact_bonus = self._calculate_exact_match_bonus(query, tool) # Combined relevance score total_score = tfidf_score + exact_bonus # Only include tools with positive relevance if total_score > 0: tool_info = { "name": tool_name, "description": tool.get("description", ""), "type": tool.get("type", ""), "category": tool_category, "parameters": tool.get("parameter", {}), "required": tool.get("required", []), "relevance_score": round(total_score, 4), "tfidf_score": round(tfidf_score, 4), "exact_match_bonus": round(exact_bonus, 4), } tool_scores.append(tool_info) # Sort by relevance score (highest first) and limit results tool_scores.sort(key=lambda x: x["relevance_score"], reverse=True) matching_tools = tool_scores[:limit] # Remove internal scoring details from final output for tool in matching_tools: tool.pop("tfidf_score", None) tool.pop("exact_match_bonus", None) return json.dumps( { "query": query, "search_method": "Advanced keyword matching (TF-IDF + NLP)", "total_matches": len(matching_tools), "categories_filtered": categories, "processing_info": { "query_tokens": len(query_tokens), "query_phrases": len(query_phrases), "indexed_tools": self._total_documents, }, "tools": matching_tools, }, indent=2, ) except Exception as e: return json.dumps( { "error": f"Advanced keyword search error: {str(e)}", "query": arguments.get("query", ""), "tools": [], }, indent=2, )
# # Tool configuration for ToolUniverse registration # TOOL_CONFIG = { # "name": "ToolFinderKeyword", # "description": "Advanced keyword-based tool finder using NLP techniques, TF-IDF scoring, and semantic phrase matching for precise tool discovery", # "type": "tool_finder_keyword", # "category": "tool_finder", # "parameter": { # "type": "object", # "properties": { # "query": { # "type": "string", # "description": "Search query describing the desired functionality. Uses advanced NLP processing including tokenization, stop word removal, and stemming." # }, # "categories": { # "type": "array", # "items": {"type": "string"}, # "description": "Optional list of tool categories to filter by" # }, # "limit": { # "type": "integer", # "description": "Maximum number of tools to return, ranked by TF-IDF relevance score (default: 10)", # "default": 10 # } # }, # "required": ["query"] # }, # "configs": { # "exclude_tools": [ # "Tool_RAG", "Tool_Finder", "Finish", "CallAgent", # "ToolFinderLLM", "ToolFinderKeyword" # ], # "features": [ # "tokenization", "stop_word_removal", "stemming", # "phrase_extraction", "tfidf_scoring", "exact_match_bonus" # ] # } # }