Source code for tooluniverse.output_hook

"""
Output Hook System for ToolUniverse

This module provides a comprehensive hook-based output processing system that allows
for intelligent post-processing of tool outputs. The system supports various types
of hooks including summarization, filtering, and transformation hooks.

Key Components:
- HookRule: Defines conditions for when hooks should trigger
- OutputHook: Base class for all output hooks
- SummarizationHook: Specialized hook for output summarization
- HookManager: Manages and coordinates all hooks

The hook system integrates seamlessly with ToolUniverse's existing architecture,
leveraging AgenticTool and ComposeTool for intelligent output processing.
"""

import json
from dataclasses import dataclass
from typing import Dict, Any, List, Optional
from pathlib import Path
from tooluniverse.logging_config import get_logger

_logger = get_logger(__name__)


[docs] class HookRule: """ Defines rules for when hooks should be triggered. This class evaluates various conditions to determine if a hook should be applied to a tool's output. Supports multiple condition types including output length, content type, and tool-specific criteria. Args: conditions (Dict[str, Any]): Dictionary containing condition specifications Attributes: conditions (Dict[str, Any]): The condition specifications """
[docs] def __init__(self, conditions: Dict[str, Any]): """ Initialize the hook rule with conditions. Args: conditions (Dict[str, Any]): Condition specifications including output_length, content_type, tool_type, etc. """ self.conditions = conditions
[docs] def evaluate( self, result: Any, tool_name: str, arguments: Dict[str, Any], context: Dict[str, Any], ) -> bool: """ Evaluate whether the rule conditions are met. Args: result (Any): The tool output to evaluate tool_name (str): Name of the tool that produced the output arguments (Dict[str, Any]): Arguments passed to the tool context (Dict[str, Any]): Additional context information Returns: bool: True if conditions are met, False otherwise """ # Evaluate output length conditions if "output_length" in self.conditions: result_str = str(result) length_condition = self.conditions["output_length"] threshold = length_condition.get("threshold", 5000) operator = length_condition.get("operator", ">") if operator == ">": return len(result_str) > threshold elif operator == ">=": return len(result_str) >= threshold elif operator == "<": return len(result_str) < threshold elif operator == "<=": return len(result_str) <= threshold # Evaluate content type conditions if "content_type" in self.conditions: content_type = self.conditions["content_type"] if content_type == "json" and isinstance(result, dict): return True elif content_type == "text" and isinstance(result, str): return True # Evaluate tool type conditions if "tool_type" in self.conditions: tool_type = context.get("tool_type", "") return tool_type == self.conditions["tool_type"] # Evaluate tool name conditions if "tool_name" in self.conditions: return tool_name == self.conditions["tool_name"] # If no specific conditions are met, return True for general rules return True
[docs] class OutputHook: """ Base class for all output hooks. This abstract base class defines the interface that all output hooks must implement. Hooks are used to process tool outputs after execution, enabling features like summarization, filtering, transformation, and validation. Args: config (Dict[str, Any]): Hook configuration including name, enabled status, priority, and conditions Attributes: config (Dict[str, Any]): Hook configuration name (str): Name of the hook enabled (bool): Whether the hook is enabled priority (int): Hook priority (lower numbers execute first) rule (HookRule): Rule for when this hook should trigger """
[docs] def __init__(self, config: Dict[str, Any]): """ Initialize the output hook with configuration. Args: config (Dict[str, Any]): Hook configuration containing: - name: Hook identifier - enabled: Whether hook is active - priority: Execution priority - conditions: Trigger conditions """ self.config = config self.name = config.get("name", "unnamed_hook") self.enabled = config.get("enabled", True) self.priority = config.get("priority", 1) self.rule = HookRule(config.get("conditions", {}))
[docs] def should_trigger( self, result: Any, tool_name: str, arguments: Dict[str, Any], context: Dict[str, Any], ) -> bool: """ Determine if this hook should be triggered for the given output. Args: result (Any): The tool output to evaluate tool_name (str): Name of the tool that produced the output arguments (Dict[str, Any]): Arguments passed to the tool context (Dict[str, Any]): Additional context information Returns: bool: True if hook should trigger, False otherwise """ if not self.enabled: return False return self.rule.evaluate(result, tool_name, arguments, context)
[docs] def process( self, result: Any, tool_name: str | None = None, arguments: Dict[str, Any] | None = None, context: Dict[str, Any] | None = None, ) -> Any: """ Process the tool output. This method must be implemented by subclasses to define the specific processing logic for the hook. Args: result (Any): The tool output to process tool_name (str): Name of the tool that produced the output arguments (Dict[str, Any]): Arguments passed to the tool context (Dict[str, Any]): Additional context information Returns: Any: The processed output Raises: NotImplementedError: If not implemented by subclass """ raise NotImplementedError("Subclasses must implement process method")
[docs] @dataclass class SummarizationHookConfig: composer_tool: str = "OutputSummarizationComposer" chunk_size: int = ( 30000 # Increased to 30000 to minimize chunk count and improve success rate ) focus_areas: str = "key_findings_and_results" max_summary_length: int = 3000 composer_timeout_sec: int = 60
[docs] def validate(self) -> "SummarizationHookConfig": # Validate numeric fields; clamp to sensible defaults if invalid if not isinstance(self.chunk_size, int) or self.chunk_size <= 0: self.chunk_size = 30000 if not isinstance(self.max_summary_length, int) or self.max_summary_length <= 0: self.max_summary_length = 3000 if ( not isinstance(self.composer_timeout_sec, int) or self.composer_timeout_sec <= 0 ): self.composer_timeout_sec = 60 if not isinstance(self.composer_tool, str) or not self.composer_tool: self.composer_tool = "OutputSummarizationComposer" return self
[docs] class SummarizationHook(OutputHook): """ Hook for intelligent output summarization using AI. This hook uses the ToolUniverse's AgenticTool and ComposeTool infrastructure to provide intelligent summarization of long tool outputs. It supports chunking large outputs, processing each chunk with AI, and merging results. Args: config (Dict[str, Any]): Hook configuration including summarization parameters tooluniverse: Reference to the ToolUniverse instance Attributes: tooluniverse: ToolUniverse instance for tool execution composer_tool (str): Name of the ComposeTool for summarization chunk_size (int): Size of chunks for processing large outputs focus_areas (str): Areas to focus on during summarization max_summary_length (int): Maximum length of final summary """
[docs] def __init__(self, config: Dict[str, Any] | SummarizationHookConfig, tooluniverse): """ Initialize the summarization hook. Args: config (Dict[str, Any]): Hook configuration tooluniverse: ToolUniverse instance for executing summarization tools """ super().__init__(config if isinstance(config, dict) else {"hook_config": {}}) self.tooluniverse = tooluniverse # Normalize input to config dataclass if isinstance(config, SummarizationHookConfig): cfg = config else: raw = config.get("hook_config", {}) if isinstance(config, dict) else {} # Breaking change: only support composer_tool going forward cfg = SummarizationHookConfig( composer_tool=raw.get("composer_tool", "OutputSummarizationComposer"), chunk_size=raw.get("chunk_size", 2000), focus_areas=raw.get("focus_areas", "key_findings_and_results"), max_summary_length=raw.get("max_summary_length", 3000), composer_timeout_sec=raw.get("composer_timeout_sec", 60), ) self.config_obj = cfg.validate() self.composer_tool = self.config_obj.composer_tool self.chunk_size = self.config_obj.chunk_size self.focus_areas = self.config_obj.focus_areas self.max_summary_length = self.config_obj.max_summary_length self.composer_timeout_sec = self.config_obj.composer_timeout_sec
[docs] def process( self, result: Any, tool_name: Optional[str] = None, arguments: Optional[Dict[str, Any]] = None, context: Optional[Dict[str, Any]] = None, ) -> Any: """ Execute summarization processing using Compose Summarizer Tool. This method orchestrates the summarization workflow by: 1. Preparing parameters for the Compose Summarizer Tool 2. Calling the tool through ToolUniverse 3. Processing and returning the summarized result Args: result (Any): The tool output to summarize tool_name (str): Name of the tool that produced the output arguments (Dict[str, Any]): Arguments passed to the tool context (Dict[str, Any]): Additional context information Returns: Any: The summarized output, or original output if summarization fails """ try: # Backward-compat: allow calling process(result) only if tool_name is None: tool_name = "unknown_tool" if arguments is None: arguments = {} if context is None: context = {} # Explicitly preserve None and empty string semantics if result is None: return None if isinstance(result, str) and result == "": return "" # Debug: basic context try: _len = len(str(result)) except Exception: _len = -1 _logger.debug( "SummarizationHook process: tool=%s, result_len=%s, chunk_size=%s, max_summary_length=%s", tool_name, _len, self.chunk_size, self.max_summary_length, ) # Check if the required tools are available if ( self.composer_tool not in self.tooluniverse.callable_functions and self.composer_tool not in self.tooluniverse.all_tool_dict ): _logger.warning( "Summarization tool '%s' not available; returning original output", self.composer_tool, ) return result # Prepare parameters for Compose Summarizer Tool composer_args = { "tool_output": str(result), "query_context": self._extract_query_context(context), "tool_name": tool_name, "chunk_size": self.chunk_size, "focus_areas": self.focus_areas, "max_summary_length": self.max_summary_length, } # Call Compose Summarizer Tool through ToolUniverse _logger.debug( "Calling composer tool '%s' (timeout=%ss)", self.composer_tool, self.composer_timeout_sec, ) # Run composer with timeout to avoid hangs try: from concurrent.futures import ( ThreadPoolExecutor, ) def _call_composer(): return self.tooluniverse.run_one_function( {"name": self.composer_tool, "arguments": composer_args} ) with ThreadPoolExecutor(max_workers=1) as _pool: _future = _pool.submit(_call_composer) composer_result = _future.result(timeout=self.composer_timeout_sec) except Exception as _e_timeout: # Timeout or execution error; log and fall back to original output _logger.warning("Composer execution failed/timeout: %s", _e_timeout) return result # Debug: show composer result meta try: if isinstance(composer_result, dict): success = composer_result.get("success", False) summary_len = len(composer_result.get("summary", "")) _logger.debug( "Composer result: success=%s summary_len=%s", success, summary_len, ) except Exception as _e_dbg: _logger.debug("Debug error inspecting composer_result: %s", _e_dbg) # Process Compose Tool result if isinstance(composer_result, dict) and composer_result.get("success"): return composer_result.get("summary", result) elif isinstance(composer_result, str): return composer_result else: _logger.warning( "Compose Summarizer Tool returned unexpected result: %s", composer_result, ) return result except Exception as e: error_msg = str(e) _logger.error("Error in summarization hook: %s", error_msg) # Check if the error is due to missing tools if "not found" in error_msg.lower() or "ToolOutputSummarizer" in error_msg: _logger.error( "Required summarization tools are not available. Please ensure the SMCP server is started with hooks enabled." ) return result
def _extract_query_context(self, context: Dict[str, Any]) -> str: """ Extract query context from execution context. This method attempts to identify the original user query or intent from the context information to provide better summarization. Args: context (Dict[str, Any]): Execution context containing arguments and metadata Returns: str: Extracted query context or fallback description """ arguments = context.get("arguments", {}) # Common query parameter names query_keys = ["query", "question", "input", "text", "search_term", "prompt"] for key in query_keys: if key in arguments: return str(arguments[key]) # If no explicit query found, return tool name as context return f"Tool execution: {context.get('tool_name', 'unknown')}"
[docs] class HookManager: """ Manages and coordinates all output hooks. The HookManager is responsible for loading hook configurations, creating hook instances, and applying hooks to tool outputs. It provides a unified interface for hook management and supports dynamic configuration updates. Args: config (Dict[str, Any]): Hook manager configuration tooluniverse: Reference to the ToolUniverse instance Attributes: config (Dict[str, Any]): Hook manager configuration tooluniverse: ToolUniverse instance for tool execution hooks (List[OutputHook]): List of loaded hook instances enabled (bool): Whether hook processing is enabled config_path (str): Path to hook configuration file """
[docs] def __init__(self, config: Dict[str, Any], tooluniverse): """ Initialize the hook manager. Args: config (Dict[str, Any]): Configuration for hook manager tooluniverse: ToolUniverse instance for executing tools """ self.config = config self.tooluniverse = tooluniverse self.hooks: List[OutputHook] = [] self.enabled = True # Alias for tests that expect hooks_enabled flag self.hooks_enabled = self.enabled self.config_path = config.get("config_path", "template/hook_config.json") self._pending_tools_to_load: List[str] = [] self._load_hook_config() # Validate LLM API keys before loading hooks if not self._validate_llm_api_keys(): _logger.warning("LLM API keys not available. Hooks will be disabled.") _logger.info( "To enable hooks, please set LLM API keys environment variable." ) # Disable hook processing but still ensure required tools are available # Tests expect hook-related tools (e.g., ToolOutputSummarizer) to be registered # in ToolUniverse.callable_functions even when hooks are disabled. self.enabled = False try: # Proactively load tools required by summarization hooks so they are discoverable all_hook_configs = [] global_hooks = ( self.config.get("hooks", []) if isinstance(self.config, dict) else [] ) for hook_cfg in global_hooks: all_hook_configs.append(hook_cfg) # Attempt auto-load based on config; if config is empty, fall back to ensuring summarization tools self._auto_load_hook_tools(all_hook_configs) # Ensure tools are pre-instantiated into callable_functions if possible self._ensure_hook_tools_loaded() except Exception as _e: # Non-fatal: we still proceed with disabled hooks _logger.warning("Failed to preload hook tools without API keys: %s", _e) # Do not proceed to create hook instances when disabled # Keep hooks list empty and reflect flags self.hooks_enabled = self.enabled return self._load_hooks() self.hooks_enabled = self.enabled
[docs] def apply_hooks( self, result: Any, tool_name: str, arguments: Dict[str, Any], context: Dict[str, Any], ) -> Any: """ Apply all applicable hooks to the tool output. This method iterates through all loaded hooks, checks if they should be applied to the current output, and processes the output through each applicable hook in priority order. Args: result (Any): The tool output to process tool_name (str): Name of the tool that produced the output arguments (Dict[str, Any]): Arguments passed to the tool context (Dict[str, Any]): Additional context information Returns: Any: The processed output after applying all applicable hooks """ if not self.enabled: return result # Load pending tools if ToolUniverse is now ready self._load_pending_tools() # Prevent recursive hook processing if self._is_hook_tool(tool_name): return result # Sort hooks by priority (lower numbers execute first) sorted_hooks = sorted(self.hooks, key=lambda h: h.priority) for hook in sorted_hooks: if not hook.enabled: continue # Check if hook is applicable to current tool if self._is_hook_applicable(hook, tool_name, context): if hook.should_trigger(result, tool_name, arguments, context): _logger.debug( "Applying hook: %s for tool: %s", hook.name, tool_name ) result = hook.process(result, tool_name, arguments, context) return result
def _validate_llm_api_keys(self) -> bool: """ Validate that LLM API keys are available for hook tools. Returns: bool: True if API keys are available, False otherwise """ from .agentic_tool import AgenticTool if AgenticTool.has_any_api_keys(): _logger.debug("LLM API keys validated successfully") return True else: _logger.error("LLM API key validation failed: No API keys available") _logger.info("To enable hooks, please set API key environment variables.") return False
[docs] def enable_hook(self, hook_name: str): """ Enable a specific hook by name. Args: hook_name (str): Name of the hook to enable """ hook = self.get_hook(hook_name) if hook: hook.enabled = True _logger.info("Enabled hook: %s", hook_name) else: _logger.error("Hook not found: %s", hook_name)
[docs] def disable_hook(self, hook_name: str): """ Disable a specific hook by name. Args: hook_name (str): Name of the hook to disable """ hook = self.get_hook(hook_name) if hook: hook.enabled = False _logger.info("Disabled hook: %s", hook_name) else: _logger.error("Hook not found: %s", hook_name)
[docs] def toggle_hooks(self, enabled: bool): """ Enable or disable all hooks globally. Args: enabled (bool): True to enable all hooks, False to disable """ self.enabled = enabled self.hooks_enabled = enabled status = "enabled" if enabled else "disabled" _logger.info("Hooks %s", status)
# Backward-compat: tests reference enable_hooks/disable_hooks APIs
[docs] def enable_hooks(self): """Enable hooks and (re)load configurations and required tools.""" self.toggle_hooks(True) # Ensure tools and hooks are ready self._ensure_hook_tools_loaded() self._load_hooks()
[docs] def disable_hooks(self): """Disable hooks and clear in-memory hook instances.""" self.toggle_hooks(False) # Do not destroy config; just clear active hooks self.hooks = []
[docs] def reload_config(self, config_path: Optional[str] = None): """ Reload hook configuration from file. Args: config_path (Optional[str]): Path to configuration file. If None, uses the current config_path """ if config_path: self.config_path = config_path self._load_hook_config() self._load_hooks() _logger.info("Reloaded hook configuration")
[docs] def get_hook(self, hook_name: str) -> Optional[OutputHook]: """ Get a hook instance by name. Args: hook_name (str): Name of the hook to retrieve Returns: Optional[OutputHook]: Hook instance if found, None otherwise """ for hook in self.hooks: if hook.name == hook_name: return hook return None
def _load_hook_config(self): """ Load hook configuration from file. This method attempts to load the hook configuration from the specified file path, handling both package resources and file system paths. If the config is already provided and not empty, it uses that instead. """ # If config is already provided and not empty, use it if self.config and ( ("hooks" in self.config) or ("tool_specific_hooks" in self.config) or ("category_hooks" in self.config) ): return try: config_file = self._get_config_file_path() if hasattr(config_file, "read_text"): content = config_file.read_text(encoding="utf-8") else: with open(config_file, "r", encoding="utf-8") as f: content = f.read() self.config = json.loads(content) except Exception as e: print(f"Warning: Could not load hook config: {e}") if not self.config: self.config = {} def _get_config_file_path(self): """ Get the path to the hook configuration file. Returns: Path: Path to the configuration file """ try: import importlib.resources as pkg_resources except ImportError: import importlib_resources as pkg_resources try: data_files = pkg_resources.files("tooluniverse.template") config_file = data_files / "hook_config.json" return config_file except Exception: # Fallback to file-based path resolution current_dir = Path(__file__).parent config_file = current_dir / "template" / "hook_config.json" return config_file def _load_hooks(self): """ Load hook configurations and create hook instances. This method processes the configuration and creates appropriate hook instances for global, tool-specific, and category-specific hooks. It also automatically loads any tools required by the hooks. """ self.hooks = [] # Collect all hook configs first to determine required tools all_hook_configs = [] # Load global hooks global_hooks = self.config.get("hooks", []) for hook_config in global_hooks: all_hook_configs.append(hook_config) # Load tool-specific hooks tool_specific_hooks = self.config.get("tool_specific_hooks", {}) for tool_name, tool_hook_config in tool_specific_hooks.items(): if tool_hook_config.get("enabled", True): tool_hooks = tool_hook_config.get("hooks", []) for hook_config in tool_hooks: hook_config["tool_name"] = tool_name all_hook_configs.append(hook_config) # Load category-specific hooks category_hooks = self.config.get("category_hooks", {}) for category_name, category_hook_config in category_hooks.items(): if category_hook_config.get("enabled", True): category_hooks_list = category_hook_config.get("hooks", []) for hook_config in category_hooks_list: hook_config["category"] = category_name all_hook_configs.append(hook_config) # Auto-load required tools for hooks self._auto_load_hook_tools(all_hook_configs) # Note: Hook tools will be pre-loaded when ToolUniverse.load_tools() is called # This is handled in the _load_pending_tools method # Create hook instances for hook_config in all_hook_configs: hook = self._create_hook_instance(hook_config) if hook: self.hooks.append(hook) def _auto_load_hook_tools(self, hook_configs: List[Dict[str, Any]]): """ Automatically load tools required by hooks. This method analyzes hook configurations to determine which tools are needed and automatically loads them into the ToolUniverse. Args: hook_configs (List[Dict[str, Any]]): List of hook configurations """ required_tools = set() for hook_config in hook_configs: hook_type = hook_config.get("type", "SummarizationHook") hook_config_section = hook_config.get("hook_config", {}) # Determine required tools based on hook type if hook_type == "SummarizationHook": composer_tool = hook_config_section.get( "composer_tool", "OutputSummarizationComposer" ) required_tools.add(composer_tool) # Also need the agentic tool for summarization required_tools.add("ToolOutputSummarizer") elif hook_type == "FilteringHook": # Add filtering-related tools if any pass elif hook_type == "FormattingHook": # Add formatting-related tools if any pass elif hook_type == "ValidationHook": # Add validation-related tools if any pass elif hook_type == "LoggingHook": # Add logging-related tools if any pass # Load required tools if required_tools: tools_to_load = [] for tool in required_tools: # Map tool names to their categories if tool in ["OutputSummarizationComposer", "ToolOutputSummarizer"]: tools_to_load.append("output_summarization") # Add more mappings as needed if tools_to_load: try: # Ensure ComposeTool is available from .compose_tool import ComposeTool from .tool_registry import register_external_tool register_external_tool("ComposeTool", ComposeTool) # Check if ToolUniverse is fully initialized if hasattr(self.tooluniverse, "all_tools"): # Load the tools and verify they were loaded self.tooluniverse.load_tools(tools_to_load) # Verify that the required tools are actually available missing_tools = [] for tool in required_tools: if ( tool not in self.tooluniverse.callable_functions and tool not in self.tooluniverse.all_tool_dict ): missing_tools.append(tool) if missing_tools: _logger.warning( "Some hook tools could not be loaded: %s", missing_tools ) _logger.info("This may cause summarization hooks to fail.") else: _logger.info( "Auto-loaded hook tools: %s", ", ".join(tools_to_load), ) else: # Store tools to load later when ToolUniverse is ready self._pending_tools_to_load = tools_to_load _logger.info( "Hook tools queued for loading: %s", ", ".join(tools_to_load), ) except Exception as e: _logger.warning("Could not auto-load hook tools: %s", e) _logger.info("This will cause summarization hooks to fail.") def _ensure_hook_tools_loaded(self): """ Ensure that tools required by hooks are loaded. This method is called during HookManager initialization to make sure that the necessary tools (like output_summarization tools) are available. """ try: # Ensure ComposeTool is available from .compose_tool import ComposeTool from .tool_registry import register_external_tool register_external_tool("ComposeTool", ComposeTool) # Load output_summarization tools if not already loaded if ( not hasattr(self.tooluniverse, "tool_category_dicts") or "output_summarization" not in self.tooluniverse.tool_category_dicts ): _logger.info("Loading output_summarization tools for hooks") self.tooluniverse.load_tools(["output_summarization"]) # Pre-instantiate hook tools to ensure they're available in callable_functions required_tools = ["ToolOutputSummarizer", "OutputSummarizationComposer"] for tool_name in required_tools: if ( tool_name in self.tooluniverse.all_tool_dict and tool_name not in self.tooluniverse.callable_functions ): try: self.tooluniverse.init_tool( self.tooluniverse.all_tool_dict[tool_name], add_to_cache=True, ) _logger.debug("Pre-loaded hook tool: %s", tool_name) except Exception as e: _logger.warning( "Failed to pre-load hook tool %s: %s", tool_name, e ) # Verify the tools were loaded missing_tools = [] for tool in required_tools: if ( hasattr(self.tooluniverse, "callable_functions") and tool not in self.tooluniverse.callable_functions and hasattr(self.tooluniverse, "all_tool_dict") and tool not in self.tooluniverse.all_tool_dict ): missing_tools.append(tool) if missing_tools: _logger.warning( "Some hook tools could not be loaded: %s", missing_tools ) _logger.info("This may cause summarization hooks to fail") else: _logger.info("Hook tools loaded successfully: %s", required_tools) except Exception as e: _logger.error("Error loading hook tools: %s", e) _logger.info("This will cause summarization hooks to fail") def _load_pending_tools(self): """ Load any pending tools that were queued during initialization. This method is called when hooks are applied to ensure that any tools that couldn't be loaded during HookManager initialization are loaded once the ToolUniverse is fully ready. """ if self._pending_tools_to_load and hasattr(self.tooluniverse, "all_tools"): try: self.tooluniverse.load_tools(self._pending_tools_to_load) _logger.info( "Loaded pending hook tools: %s", ", ".join(self._pending_tools_to_load), ) self._pending_tools_to_load = [] # Clear the pending list except Exception as e: _logger.warning("Could not load pending hook tools: %s", e) # Pre-load hook tools if they're available but not instantiated self._ensure_hook_tools_loaded() def _is_hook_tool(self, tool_name: str) -> bool: """ Check if a tool is a hook-related tool that should not be processed by hooks. This prevents recursive hook processing where hook tools (like ToolOutputSummarizer) produce output that would trigger more hook processing. Args: tool_name (str): Name of the tool to check Returns: bool: True if the tool is a hook tool and should be excluded from hook processing """ hook_tool_names = [ "ToolOutputSummarizer", "OutputSummarizationComposer", # Add more hook tool names as needed ] return tool_name in hook_tool_names def _create_hook_instance( self, hook_config: Dict[str, Any] ) -> Optional[OutputHook]: """ Create a hook instance based on configuration. This method creates hook instances and applies hook type-specific defaults from the configuration before initializing the hook. Args: hook_config (Dict[str, Any]): Hook configuration Returns: Optional[OutputHook]: Created hook instance or None if type not supported """ hook_type = hook_config.get("type", "SummarizationHook") # Apply hook type-specific defaults enhanced_config = self._apply_hook_type_defaults(hook_config) if hook_type == "SummarizationHook": return SummarizationHook(enhanced_config, self.tooluniverse) elif hook_type == "FileSaveHook": # Merge hook_config with the main config for FileSaveHook file_save_config = enhanced_config.copy() file_save_config.update(enhanced_config.get("hook_config", {})) return FileSaveHook(file_save_config) else: _logger.error("Unknown hook type: %s", hook_type) return None def _apply_hook_type_defaults(self, hook_config: Dict[str, Any]) -> Dict[str, Any]: """ Apply hook type-specific default values to hook configuration. This method merges hook type defaults with individual hook configuration, ensuring that each hook type gets its appropriate default values. Args: hook_config (Dict[str, Any]): Original hook configuration Returns: Dict[str, Any]: Enhanced configuration with defaults applied """ hook_type = hook_config.get("type", "SummarizationHook") # Get hook type defaults from configuration hook_type_defaults = self.config.get("hook_type_defaults", {}).get( hook_type, {} ) # Create enhanced configuration enhanced_config = hook_config.copy() # Apply defaults to hook_config if not already specified if "hook_config" not in enhanced_config: enhanced_config["hook_config"] = {} hook_config_section = enhanced_config["hook_config"] # Apply defaults for each hook type if hook_type == "SummarizationHook": defaults = { "composer_tool": "OutputSummarizationComposer", "chunk_size": hook_type_defaults.get("default_chunk_size", 2000), "focus_areas": hook_type_defaults.get( "default_focus_areas", "key_findings_and_results" ), "max_summary_length": hook_type_defaults.get( "default_max_summary_length", 3000 ), } # Removed unsupported hook types to avoid confusion elif hook_type == "FileSaveHook": defaults = { "temp_dir": hook_type_defaults.get("default_temp_dir", None), "file_prefix": hook_type_defaults.get( "default_file_prefix", "tool_output" ), "include_metadata": hook_type_defaults.get( "default_include_metadata", True ), "auto_cleanup": hook_type_defaults.get("default_auto_cleanup", False), "cleanup_age_hours": hook_type_defaults.get( "default_cleanup_age_hours", 24 ), } else: defaults = {} # Apply defaults only if not already specified for key, default_value in defaults.items(): if key not in hook_config_section: hook_config_section[key] = default_value return enhanced_config def _is_hook_applicable( self, hook: OutputHook, tool_name: str, context: Dict[str, Any] ) -> bool: """ Check if a hook is applicable to the current tool. Args: hook (OutputHook): Hook instance to check tool_name (str): Name of the current tool context (Dict[str, Any]): Execution context Returns: bool: True if hook is applicable, False otherwise """ # Check tool-specific hooks if "tool_name" in hook.config: return hook.config["tool_name"] == tool_name # Check category-specific hooks if "category" in hook.config: # This would need to be implemented based on actual tool categorization # For now, return True to apply category hooks to all tools return True # Global hooks apply to all tools return True
[docs] class FileSaveHook(OutputHook): """ Hook that saves tool outputs to temporary files and returns file information. This hook saves the tool output to a temporary file and returns information about the file path, data format, and data structure instead of the original output. This is useful for handling large outputs or when you need to process outputs as files rather than in-memory data. Configuration options: - temp_dir: Directory to save temporary files (default: system temp) - file_prefix: Prefix for generated filenames (default: 'tool_output') - include_metadata: Whether to include metadata in the response (default: True) - auto_cleanup: Whether to automatically clean up old files (default: False) - cleanup_age_hours: Age in hours for auto cleanup (default: 24) """
[docs] def __init__(self, config: Dict[str, Any]): """ Initialize the FileSaveHook. Args: config (Dict[str, Any]): Hook configuration including: - name: Hook name - temp_dir: Directory for temporary files - file_prefix: Prefix for filenames - include_metadata: Include metadata flag - auto_cleanup: Auto cleanup flag - cleanup_age_hours: Cleanup age in hours """ super().__init__(config) # Set default configuration self.temp_dir = config.get("temp_dir", None) self.file_prefix = config.get("file_prefix", "tool_output") self.include_metadata = config.get("include_metadata", True) self.auto_cleanup = config.get("auto_cleanup", False) self.cleanup_age_hours = config.get("cleanup_age_hours", 24) # Import required modules import tempfile import os from datetime import datetime, timedelta self.tempfile = tempfile self.os = os self.datetime = datetime self.timedelta = timedelta # Create temp directory if specified if self.temp_dir: self.os.makedirs(self.temp_dir, exist_ok=True)
[docs] def process( self, result: Any, tool_name: str, arguments: Dict[str, Any], context: Dict[str, Any], ) -> Dict[str, Any]: """ Process the tool output by saving it to a temporary file. Args: result (Any): The tool output to process tool_name (str): Name of the tool that produced the output arguments (Dict[str, Any]): Arguments passed to the tool context (Dict[str, Any]): Execution context Returns: Dict[str, Any]: Dictionary containing file information: - file_path: Path to the saved file - data_format: Format of the data (json, text, binary, etc.) - data_structure: Structure information about the data - file_size: Size of the file in bytes - created_at: Timestamp when file was created - metadata: Additional metadata (if include_metadata is True) """ try: # Determine data format and structure data_format, data_structure = self._analyze_data(result) # Generate filename timestamp = self.datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{self.file_prefix}_{tool_name}_{timestamp}.{data_format}" # Save to temporary file if self.temp_dir: file_path = self.os.path.join(self.temp_dir, filename) else: # Use system temp directory temp_fd, file_path = self.tempfile.mkstemp( suffix=f"_{filename}", prefix=self.file_prefix, dir=self.temp_dir ) self.os.close(temp_fd) # Write data to file self._write_data_to_file(result, file_path, data_format) # Get file size file_size = self.os.path.getsize(file_path) # Prepare response response = { "file_path": file_path, "data_format": data_format, "data_structure": data_structure, "file_size": file_size, "created_at": self.datetime.now().isoformat(), "tool_name": tool_name, "original_arguments": arguments, } # Add metadata if requested if self.include_metadata: response["metadata"] = { "hook_name": self.name, "hook_type": "FileSaveHook", "processing_time": self.datetime.now().isoformat(), "context": context, } # Perform auto cleanup if enabled if self.auto_cleanup: self._cleanup_old_files() return response except Exception as e: # Return error information instead of failing return { "error": f"Failed to save output to file: {str(e)}", "original_output": str(result), "tool_name": tool_name, "hook_name": self.name, }
def _analyze_data(self, data: Any) -> tuple[str, str]: """ Analyze the data to determine its format and structure. Args: data (Any): The data to analyze Returns: tuple[str, str]: (data_format, data_structure) """ if isinstance(data, dict): return "json", f"dict with {len(data)} keys" elif isinstance(data, list): return "json", f"list with {len(data)} items" elif isinstance(data, str): if data.strip().startswith("{") or data.strip().startswith("["): return "json", "JSON string" else: return "txt", f"text with {len(data)} characters" elif isinstance(data, (int, float)): return "json", "numeric value" elif isinstance(data, bool): return "json", "boolean value" else: return "bin", f"binary data of type {type(data).__name__}" def _write_data_to_file(self, data: Any, file_path: str, data_format: str) -> None: """ Write data to file in the appropriate format. Args: data (Any): The data to write file_path (str): Path to the file data_format (str): Format of the data """ if data_format == "json": with open(file_path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) elif data_format == "txt": with open(file_path, "w", encoding="utf-8") as f: f.write(str(data)) else: # For binary or other formats, write as string with open(file_path, "w", encoding="utf-8") as f: f.write(str(data)) def _cleanup_old_files(self) -> None: """ Clean up old files based on the cleanup_age_hours setting. """ if not self.temp_dir: return try: current_time = self.datetime.now() cutoff_time = current_time - self.timedelta(hours=self.cleanup_age_hours) for filename in self.os.listdir(self.temp_dir): if filename.startswith(self.file_prefix): file_path = self.os.path.join(self.temp_dir, filename) file_time = self.datetime.fromtimestamp( self.os.path.getmtime(file_path) ) if file_time < cutoff_time: self.os.remove(file_path) except Exception as e: # Log error but don't fail the hook print(f"Warning: Failed to cleanup old files: {e}")