"""
Output Hook System for ToolUniverse
This module provides a comprehensive hook-based output processing system that allows
for intelligent post-processing of tool outputs. The system supports various types
of hooks including summarization, filtering, and transformation hooks.
Key Components:
- HookRule: Defines conditions for when hooks should trigger
- OutputHook: Base class for all output hooks
- SummarizationHook: Specialized hook for output summarization
- HookManager: Manages and coordinates all hooks
The hook system integrates seamlessly with ToolUniverse's existing architecture,
leveraging AgenticTool and ComposeTool for intelligent output processing.
"""
import json
from dataclasses import dataclass
from typing import Dict, Any, List, Optional
from pathlib import Path
from tooluniverse.logging_config import get_logger
_logger = get_logger(__name__)
[docs]
class HookRule:
"""
Defines rules for when hooks should be triggered.
This class evaluates various conditions to determine if a hook should
be applied to a tool's output. Supports multiple condition types including
output length, content type, and tool-specific criteria.
Args:
conditions (Dict[str, Any]): Dictionary containing condition specifications
Attributes:
conditions (Dict[str, Any]): The condition specifications
"""
[docs]
def __init__(self, conditions: Dict[str, Any]):
"""
Initialize the hook rule with conditions.
Args:
conditions (Dict[str, Any]): Condition specifications including
output_length, content_type, tool_type, etc.
"""
self.conditions = conditions
[docs]
def evaluate(
self,
result: Any,
tool_name: str,
arguments: Dict[str, Any],
context: Dict[str, Any],
) -> bool:
"""
Evaluate whether the rule conditions are met.
Args:
result (Any): The tool output to evaluate
tool_name (str): Name of the tool that produced the output
arguments (Dict[str, Any]): Arguments passed to the tool
context (Dict[str, Any]): Additional context information
Returns
bool: True if conditions are met, False otherwise
"""
# Evaluate output length conditions
if "output_length" in self.conditions:
result_str = str(result)
length_condition = self.conditions["output_length"]
threshold = length_condition.get("threshold", 5000)
operator = length_condition.get("operator", ">")
if operator == ">":
return len(result_str) > threshold
elif operator == ">=":
return len(result_str) >= threshold
elif operator == "<":
return len(result_str) < threshold
elif operator == "<=":
return len(result_str) <= threshold
# Evaluate content type conditions
if "content_type" in self.conditions:
content_type = self.conditions["content_type"]
if content_type == "json" and isinstance(result, dict):
return True
elif content_type == "text" and isinstance(result, str):
return True
# Evaluate tool type conditions
if "tool_type" in self.conditions:
tool_type = context.get("tool_type", "")
return tool_type == self.conditions["tool_type"]
# Evaluate tool name conditions
if "tool_name" in self.conditions:
return tool_name == self.conditions["tool_name"]
# If no specific conditions are met, return True for general rules
return True
[docs]
class OutputHook:
"""
Base class for all output hooks.
This abstract base class defines the interface that all output hooks must implement.
Hooks are used to process tool outputs after execution, enabling features like
summarization, filtering, transformation, and validation.
Args:
config (Dict[str, Any]): Hook configuration including name, enabled status,
priority, and conditions
Attributes:
config (Dict[str, Any]): Hook configuration
name (str): Name of the hook
enabled (bool): Whether the hook is enabled
priority (int): Hook priority (lower numbers execute first)
rule (HookRule): Rule for when this hook should trigger
"""
[docs]
def __init__(self, config: Dict[str, Any]):
"""
Initialize the output hook with configuration.
Args:
config (Dict[str, Any]): Hook configuration containing:
- name: Hook identifier
- enabled: Whether hook is active
- priority: Execution priority
- conditions: Trigger conditions
"""
self.config = config
self.name = config.get("name", "unnamed_hook")
self.enabled = config.get("enabled", True)
self.priority = config.get("priority", 1)
self.rule = HookRule(config.get("conditions", {}))
[docs]
def should_trigger(
self,
result: Any,
tool_name: str,
arguments: Dict[str, Any],
context: Dict[str, Any],
) -> bool:
"""
Determine if this hook should be triggered for the given output.
Args:
result (Any): The tool output to evaluate
tool_name (str): Name of the tool that produced the output
arguments (Dict[str, Any]): Arguments passed to the tool
context (Dict[str, Any]): Additional context information
Returns
bool: True if hook should trigger, False otherwise
"""
if not self.enabled:
return False
return self.rule.evaluate(result, tool_name, arguments, context)
[docs]
def process(
self,
result: Any,
tool_name: str | None = None,
arguments: Dict[str, Any] | None = None,
context: Dict[str, Any] | None = None,
) -> Any:
"""
Process the tool output.
This method must be implemented by subclasses to define the specific
processing logic for the hook.
Args:
result (Any): The tool output to process
tool_name (str): Name of the tool that produced the output
arguments (Dict[str, Any]): Arguments passed to the tool
context (Dict[str, Any]): Additional context information
Returns
Any: The processed output
Raises:
NotImplementedError: If not implemented by subclass
"""
raise NotImplementedError("Subclasses must implement process method")
[docs]
@dataclass
class SummarizationHookConfig:
composer_tool: str = "OutputSummarizationComposer"
chunk_size: int = (
30000 # Increased to 30000 to minimize chunk count and improve success rate
)
focus_areas: str = "key_findings_and_results"
max_summary_length: int = 3000
composer_timeout_sec: int = 300
[docs]
def validate(self) -> "SummarizationHookConfig":
# Validate numeric fields; clamp to sensible defaults if invalid
if not isinstance(self.chunk_size, int) or self.chunk_size <= 0:
self.chunk_size = 30000
if not isinstance(self.max_summary_length, int) or self.max_summary_length <= 0:
self.max_summary_length = 3000
if (
not isinstance(self.composer_timeout_sec, int)
or self.composer_timeout_sec <= 0
):
self.composer_timeout_sec = 300
if not isinstance(self.composer_tool, str) or not self.composer_tool:
self.composer_tool = "OutputSummarizationComposer"
return self
[docs]
class SummarizationHook(OutputHook):
"""
Hook for intelligent output summarization using AI.
This hook uses the ToolUniverse's AgenticTool and ComposeTool infrastructure
to provide intelligent summarization of long tool outputs. It supports
chunking large outputs, processing each chunk with AI, and merging results.
Args:
config (Dict[str, Any]): Hook configuration including summarization parameters
tooluniverse: Reference to the ToolUniverse instance
Attributes:
tooluniverse: ToolUniverse instance for tool execution
composer_tool (str): Name of the ComposeTool for summarization
chunk_size (int): Size of chunks for processing large outputs
focus_areas (str): Areas to focus on during summarization
max_summary_length (int): Maximum length of final summary
"""
[docs]
def __init__(self, config: Dict[str, Any] | SummarizationHookConfig, tooluniverse):
"""
Initialize the summarization hook.
Args:
config (Dict[str, Any]): Hook configuration
tooluniverse: ToolUniverse instance for executing summarization tools
"""
super().__init__(config if isinstance(config, dict) else {"hook_config": {}})
self.tooluniverse = tooluniverse
# Normalize input to config dataclass
if isinstance(config, SummarizationHookConfig):
cfg = config
else:
raw = config.get("hook_config", {}) if isinstance(config, dict) else {}
# Breaking change: only support composer_tool going forward
cfg = SummarizationHookConfig(
composer_tool=raw.get("composer_tool", "OutputSummarizationComposer"),
# Default should match SummarizationHookConfig and documentation.
chunk_size=raw.get("chunk_size", 30000),
focus_areas=raw.get("focus_areas", "key_findings_and_results"),
max_summary_length=raw.get("max_summary_length", 3000),
composer_timeout_sec=raw.get("composer_timeout_sec", 300),
)
self.config_obj = cfg.validate()
self.composer_tool = self.config_obj.composer_tool
self.chunk_size = self.config_obj.chunk_size
self.focus_areas = self.config_obj.focus_areas
self.max_summary_length = self.config_obj.max_summary_length
self.composer_timeout_sec = self.config_obj.composer_timeout_sec
[docs]
def process(
self,
result: Any,
tool_name: Optional[str] = None,
arguments: Optional[Dict[str, Any]] = None,
context: Optional[Dict[str, Any]] = None,
) -> Any:
"""
Execute summarization processing using Compose Summarizer Tool.
This method orchestrates the summarization workflow by:
1. Preparing parameters for the Compose Summarizer Tool
2. Calling the tool through ToolUniverse
3. Processing and returning the summarized result
Args:
result (Any): The tool output to summarize
tool_name (str): Name of the tool that produced the output
arguments (Dict[str, Any]): Arguments passed to the tool
context (Dict[str, Any]): Additional context information
Returns
Any: The summarized output, or original output if summarization fails
"""
try:
# Backward-compat: allow calling process(result) only
if tool_name is None:
tool_name = "unknown_tool"
if arguments is None:
arguments = {}
if context is None:
context = {}
# Explicitly preserve None and empty string semantics
if result is None:
return None
if isinstance(result, str) and result == "":
return ""
# Debug: basic context
try:
_len = len(str(result))
except Exception:
_len = -1
_logger.debug(
"SummarizationHook process: tool=%s, result_len=%s, chunk_size=%s, max_summary_length=%s",
tool_name,
_len,
self.chunk_size,
self.max_summary_length,
)
# Check if the required tools are available
if (
self.composer_tool not in self.tooluniverse.callable_functions
and self.composer_tool not in self.tooluniverse.all_tool_dict
):
_logger.warning(
"Summarization tool '%s' not available; returning original output",
self.composer_tool,
)
return result
# Prepare parameters for Compose Summarizer Tool
composer_args = {
"tool_output": str(result),
"query_context": self._extract_query_context(context),
"tool_name": tool_name,
"chunk_size": self.chunk_size,
"focus_areas": self.focus_areas,
"max_summary_length": self.max_summary_length,
}
# Call Compose Summarizer Tool through ToolUniverse
_logger.debug(
"Calling composer tool '%s' (timeout=%ss)",
self.composer_tool,
self.composer_timeout_sec,
)
# Run composer with timeout to avoid hangs
try:
from concurrent.futures import (
ThreadPoolExecutor,
)
def _call_composer():
return self.tooluniverse.run_one_function(
{"name": self.composer_tool, "arguments": composer_args}
)
with ThreadPoolExecutor(max_workers=1) as _pool:
_future = _pool.submit(_call_composer)
composer_result = _future.result(timeout=self.composer_timeout_sec)
except Exception as _e_timeout:
# Timeout or execution error; log and fall back to original output
_logger.warning("Composer execution failed/timeout: %s", _e_timeout)
return result
# Debug: show composer result meta
try:
if isinstance(composer_result, dict):
success = composer_result.get("success", False)
summary_len = len(composer_result.get("summary", ""))
_logger.debug(
"Composer result: success=%s summary_len=%s",
success,
summary_len,
)
except Exception as _e_dbg:
_logger.debug("Debug error inspecting composer_result: %s", _e_dbg)
# Process Compose Tool result
if isinstance(composer_result, dict) and composer_result.get("success"):
return composer_result.get("summary", result)
elif isinstance(composer_result, str):
return composer_result
else:
_logger.warning(
"Compose Summarizer Tool returned unexpected result: %s",
composer_result,
)
return result
except Exception as e:
error_msg = str(e)
_logger.error("Error in summarization hook: %s", error_msg)
# Check if the error is due to missing tools
if "not found" in error_msg.lower() or "ToolOutputSummarizer" in error_msg:
_logger.error(
"Required summarization tools are not available. Please ensure the SMCP server is started with hooks enabled."
)
return result
[docs]
class HookManager:
"""
Manages and coordinates all output hooks.
The HookManager is responsible for loading hook configurations, creating
hook instances, and applying hooks to tool outputs. It provides a unified
interface for hook management and supports dynamic configuration updates.
Args:
config (Dict[str, Any]): Hook manager configuration
tooluniverse: Reference to the ToolUniverse instance
Attributes:
config (Dict[str, Any]): Hook manager configuration
tooluniverse: ToolUniverse instance for tool execution
hooks (List[OutputHook]): List of loaded hook instances
enabled (bool): Whether hook processing is enabled
config_path (str): Path to hook configuration file
"""
[docs]
def __init__(self, config: Dict[str, Any], tooluniverse):
"""
Initialize the hook manager.
Args:
config (Dict[str, Any]): Configuration for hook manager
tooluniverse: ToolUniverse instance for executing tools
"""
self.config = config
self.tooluniverse = tooluniverse
self.hooks: List[OutputHook] = []
self.enabled = True
# Alias for tests that expect hooks_enabled flag
self.hooks_enabled = self.enabled
self.config_path = config.get("config_path", "template/hook_config.json")
self._pending_tools_to_load: List[str] = []
# Cache excluded patterns for performance
self._excluded_patterns_cache = None
self._load_hook_config()
# Validate LLM API keys before loading hooks
if not self._validate_llm_api_keys():
_logger.warning("LLM API keys not available. Hooks will be disabled.")
_logger.info(
"To enable hooks, please set LLM API keys environment variable."
)
# Disable hook processing but still ensure required tools are available
# Tests expect hook-related tools (e.g., ToolOutputSummarizer) to be registered
# in ToolUniverse.callable_functions even when hooks are disabled.
self.enabled = False
try:
# Proactively load tools required by summarization hooks so they are discoverable
all_hook_configs = []
global_hooks = (
self.config.get("hooks", [])
if isinstance(self.config, dict)
else []
)
for hook_cfg in global_hooks:
all_hook_configs.append(hook_cfg)
# Attempt auto-load based on config; if config is empty, fall back to ensuring summarization tools
self._auto_load_hook_tools(all_hook_configs)
# Ensure tools are pre-instantiated into callable_functions if possible
self._ensure_hook_tools_loaded()
except Exception as _e:
# Non-fatal: we still proceed with disabled hooks
_logger.warning("Failed to preload hook tools without API keys: %s", _e)
# Do not proceed to create hook instances when disabled
# Keep hooks list empty and reflect flags
self.hooks_enabled = self.enabled
return
self._load_hooks()
self.hooks_enabled = self.enabled
[docs]
def apply_hooks(
self,
result: Any,
tool_name: str,
arguments: Dict[str, Any],
context: Dict[str, Any],
) -> Any:
"""
Apply all applicable hooks to the tool output.
This method iterates through all loaded hooks, checks if they should
be applied to the current output, and processes the output through
each applicable hook in priority order.
Args:
result (Any): The tool output to process
tool_name (str): Name of the tool that produced the output
arguments (Dict[str, Any]): Arguments passed to the tool
context (Dict[str, Any]): Additional context information
Returns
Any: The processed output after applying all applicable hooks
"""
if not self.enabled:
return result
# Load pending tools if ToolUniverse is now ready
self._load_pending_tools()
# Prevent recursive hook processing
if self._is_hook_tool(tool_name):
return result
# Sort hooks by priority (lower numbers execute first)
sorted_hooks = sorted(self.hooks, key=lambda h: h.priority)
for hook in sorted_hooks:
if not hook.enabled:
continue
# Check if hook is applicable to current tool
if self._is_hook_applicable(hook, tool_name, context):
if hook.should_trigger(result, tool_name, arguments, context):
_logger.debug(
"Applying hook: %s for tool: %s", hook.name, tool_name
)
result = hook.process(result, tool_name, arguments, context)
return result
[docs]
def _validate_llm_api_keys(self) -> bool:
"""
Validate that LLM API keys are available for hook tools.
Returns
bool: True if API keys are available, False otherwise
"""
from .agentic_tool import AgenticTool
if AgenticTool.has_any_api_keys():
_logger.debug("LLM API keys validated successfully")
return True
else:
_logger.error("LLM API key validation failed: No API keys available")
_logger.info("To enable hooks, please set API key environment variables.")
return False
[docs]
def enable_hook(self, hook_name: str):
"""
Enable a specific hook by name.
Args:
hook_name (str): Name of the hook to enable
"""
hook = self.get_hook(hook_name)
if hook:
hook.enabled = True
_logger.info("Enabled hook: %s", hook_name)
else:
_logger.error("Hook not found: %s", hook_name)
[docs]
def disable_hook(self, hook_name: str):
"""
Disable a specific hook by name.
Args:
hook_name (str): Name of the hook to disable
"""
hook = self.get_hook(hook_name)
if hook:
hook.enabled = False
_logger.info("Disabled hook: %s", hook_name)
else:
_logger.error("Hook not found: %s", hook_name)
[docs]
def toggle_hooks(self, enabled: bool):
"""
Enable or disable all hooks globally.
Args:
enabled (bool): True to enable all hooks, False to disable
"""
self.enabled = enabled
self.hooks_enabled = enabled
status = "enabled" if enabled else "disabled"
_logger.info("Hooks %s", status)
# Backward-compat: tests reference enable_hooks/disable_hooks APIs
[docs]
def enable_hooks(self):
"""Enable hooks and (re)load configurations and required tools."""
self.toggle_hooks(True)
# Ensure tools and hooks are ready
self._ensure_hook_tools_loaded()
self._load_hooks()
[docs]
def disable_hooks(self):
"""Disable hooks and clear in-memory hook instances."""
self.toggle_hooks(False)
# Do not destroy config; just clear active hooks
self.hooks = []
[docs]
def reload_config(self, config_path: Optional[str] = None):
"""
Reload hook configuration from file.
Args:
config_path (Optional[str]): Path to configuration file.
If None, uses the current config_path
"""
if config_path:
self.config_path = config_path
self._excluded_patterns_cache = None # Clear cache on reload
self._load_hook_config()
self._load_hooks()
_logger.info("Reloaded hook configuration")
[docs]
def get_hook(self, hook_name: str) -> Optional[OutputHook]:
"""
Get a hook instance by name.
Args:
hook_name (str): Name of the hook to retrieve
Returns
Optional[OutputHook]: Hook instance if found, None otherwise
"""
for hook in self.hooks:
if hook.name == hook_name:
return hook
return None
[docs]
def _load_hook_config(self):
"""
Load hook configuration from file.
This method attempts to load the hook configuration from the specified
file path, handling both package resources and file system paths.
If the config is already provided and not empty, it uses that instead.
"""
# If config is already provided and not empty, use it
if self.config and (
("hooks" in self.config)
or ("tool_specific_hooks" in self.config)
or ("category_hooks" in self.config)
):
return
try:
config_file = self._get_config_file_path()
content = (
config_file.read_text(encoding="utf-8")
if hasattr(config_file, "read_text")
else Path(config_file).read_text(encoding="utf-8")
)
self.config = json.loads(content)
self._excluded_patterns_cache = None # Clear cache on reload
except Exception as e:
print(f"Warning: Could not load hook config: {e}")
if not self.config:
self.config = {}
[docs]
def _get_config_file_path(self):
"""
Get the path to the hook configuration file.
Returns
Path: Path to the configuration file
"""
try:
import importlib.resources as pkg_resources
except ImportError:
import importlib_resources as pkg_resources
try:
data_files = pkg_resources.files("tooluniverse.template")
config_file = data_files / "hook_config.json"
return config_file
except Exception:
# Fallback to file-based path resolution
current_dir = Path(__file__).parent
config_file = current_dir / "template" / "hook_config.json"
return config_file
[docs]
def _load_hooks(self):
"""
Load hook configurations and create hook instances.
This method processes the configuration and creates appropriate
hook instances for global, tool-specific, and category-specific hooks.
It also automatically loads any tools required by the hooks.
"""
self.hooks = []
# Collect all hook configs first to determine required tools
all_hook_configs = []
# Load global hooks
global_hooks = self.config.get("hooks", [])
for hook_config in global_hooks:
all_hook_configs.append(hook_config)
# Load tool-specific hooks
tool_specific_hooks = self.config.get("tool_specific_hooks", {})
for tool_name, tool_hook_config in tool_specific_hooks.items():
if tool_hook_config.get("enabled", True):
tool_hooks = tool_hook_config.get("hooks", [])
for hook_config in tool_hooks:
hook_config["tool_name"] = tool_name
all_hook_configs.append(hook_config)
# Load category-specific hooks
category_hooks = self.config.get("category_hooks", {})
for category_name, category_hook_config in category_hooks.items():
if category_hook_config.get("enabled", True):
category_hooks_list = category_hook_config.get("hooks", [])
for hook_config in category_hooks_list:
hook_config["category"] = category_name
all_hook_configs.append(hook_config)
# Auto-load required tools for hooks
self._auto_load_hook_tools(all_hook_configs)
# Note: Hook tools will be pre-loaded when ToolUniverse.load_tools() is called
# This is handled in the _load_pending_tools method
# Create hook instances
for hook_config in all_hook_configs:
hook = self._create_hook_instance(hook_config)
if hook:
self.hooks.append(hook)
[docs]
def _create_hook_instance(
self, hook_config: Dict[str, Any]
) -> Optional[OutputHook]:
"""
Create a hook instance based on configuration.
This method creates hook instances and applies hook type-specific defaults
from the configuration before initializing the hook.
Args:
hook_config (Dict[str, Any]): Hook configuration
Returns
Optional[OutputHook]: Created hook instance or None if type not supported
"""
hook_type = hook_config.get("type", "SummarizationHook")
# Apply hook type-specific defaults
enhanced_config = self._apply_hook_type_defaults(hook_config)
if hook_type == "SummarizationHook":
return SummarizationHook(enhanced_config, self.tooluniverse)
elif hook_type == "FileSaveHook":
# Merge hook_config with the main config for FileSaveHook
file_save_config = enhanced_config.copy()
file_save_config.update(enhanced_config.get("hook_config", {}))
return FileSaveHook(file_save_config)
else:
_logger.error("Unknown hook type: %s", hook_type)
return None
[docs]
def _apply_hook_type_defaults(self, hook_config: Dict[str, Any]) -> Dict[str, Any]:
"""
Apply hook type-specific default values to hook configuration.
This method merges hook type defaults with individual hook configuration,
ensuring that each hook type gets its appropriate default values.
Args:
hook_config (Dict[str, Any]): Original hook configuration
Returns
Dict[str, Any]: Enhanced configuration with defaults applied
"""
hook_type = hook_config.get("type", "SummarizationHook")
# Get hook type defaults from configuration
hook_type_defaults = self.config.get("hook_type_defaults", {}).get(
hook_type, {}
)
# Create enhanced configuration
enhanced_config = hook_config.copy()
# Apply defaults to hook_config if not already specified
if "hook_config" not in enhanced_config:
enhanced_config["hook_config"] = {}
hook_config_section = enhanced_config["hook_config"]
# Apply defaults for each hook type
if hook_type == "SummarizationHook":
defaults = {
"composer_tool": "OutputSummarizationComposer",
# Default chunk_size is intentionally large; chunk_size controls
# chunking, not whether summarization is needed.
"chunk_size": hook_type_defaults.get("default_chunk_size", 30000),
"focus_areas": hook_type_defaults.get(
"default_focus_areas", "key_findings_and_results"
),
"max_summary_length": hook_type_defaults.get(
"default_max_summary_length", 3000
),
}
# Removed unsupported hook types to avoid confusion
elif hook_type == "FileSaveHook":
defaults = {
"temp_dir": hook_type_defaults.get("default_temp_dir", None),
"file_prefix": hook_type_defaults.get(
"default_file_prefix", "tool_output"
),
"include_metadata": hook_type_defaults.get(
"default_include_metadata", True
),
"auto_cleanup": hook_type_defaults.get("default_auto_cleanup", False),
"cleanup_age_hours": hook_type_defaults.get(
"default_cleanup_age_hours", 24
),
}
else:
defaults = {}
# Apply defaults only if not already specified
for key, default_value in defaults.items():
if key not in hook_config_section:
hook_config_section[key] = default_value
return enhanced_config
[docs]
def _is_hook_applicable(
self, hook: OutputHook, tool_name: str, context: Dict[str, Any]
) -> bool:
"""
Check if a hook is applicable to the current tool.
Args:
hook (OutputHook): Hook instance to check
tool_name (str): Name of the current tool
context (Dict[str, Any]): Execution context
Returns
bool: True if hook is applicable, False otherwise
"""
# Check tool-specific hooks
if "tool_name" in hook.config:
return hook.config["tool_name"] == tool_name
# Check category-specific hooks
if "category" in hook.config:
# This would need to be implemented based on actual tool categorization
# For now, return True to apply category hooks to all tools
return True
# Global hooks apply to all tools
return True
[docs]
class FileSaveHook(OutputHook):
"""
Hook that saves tool outputs to temporary files and returns file information.
This hook saves the tool output to a temporary file and returns information
about the file path, data format, and data structure instead of the original output.
This is useful for handling large outputs or when you need to process outputs
as files rather than in-memory data.
Configuration options:
- temp_dir: Directory to save temporary files (default: system temp)
- file_prefix: Prefix for generated filenames (default: 'tool_output')
- include_metadata: Whether to include metadata in the response (default: True)
- auto_cleanup: Whether to automatically clean up old files (default: False)
- cleanup_age_hours: Age in hours for auto cleanup (default: 24)
"""
[docs]
def __init__(self, config: Dict[str, Any]):
"""
Initialize the FileSaveHook.
Args:
config (Dict[str, Any]): Hook configuration including:
- name: Hook name
- temp_dir: Directory for temporary files
- file_prefix: Prefix for filenames
- include_metadata: Include metadata flag
- auto_cleanup: Auto cleanup flag
- cleanup_age_hours: Cleanup age in hours
"""
super().__init__(config)
# Set default configuration
self.temp_dir = config.get("temp_dir", None)
self.file_prefix = config.get("file_prefix", "tool_output")
self.include_metadata = config.get("include_metadata", True)
self.auto_cleanup = config.get("auto_cleanup", False)
self.cleanup_age_hours = config.get("cleanup_age_hours", 24)
# Import required modules
import tempfile
import os
from datetime import datetime, timedelta
self.tempfile = tempfile
self.os = os
self.datetime = datetime
self.timedelta = timedelta
# Create temp directory if specified
if self.temp_dir:
self.os.makedirs(self.temp_dir, exist_ok=True)
[docs]
def process(
self,
result: Any,
tool_name: str,
arguments: Dict[str, Any],
context: Dict[str, Any],
) -> Dict[str, Any]:
"""
Process the tool output by saving it to a temporary file.
Args:
result (Any): The tool output to process
tool_name (str): Name of the tool that produced the output
arguments (Dict[str, Any]): Arguments passed to the tool
context (Dict[str, Any]): Execution context
Returns
Dict[str, Any]: Dictionary containing file information:
- file_path: Path to the saved file
- data_format: Format of the data (json, text, binary, etc.)
- data_structure: Structure information about the data
- file_size: Size of the file in bytes
- created_at: Timestamp when file was created
- metadata: Additional metadata (if include_metadata is True)
"""
try:
# Determine data format and structure
data_format, data_structure = self._analyze_data(result)
# Generate filename
timestamp = self.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{self.file_prefix}_{tool_name}_{timestamp}.{data_format}"
# Save to temporary file
if self.temp_dir:
file_path = self.os.path.join(self.temp_dir, filename)
else:
# Use system temp directory
temp_fd, file_path = self.tempfile.mkstemp(
suffix=f"_{filename}", prefix=self.file_prefix, dir=self.temp_dir
)
self.os.close(temp_fd)
# Write data to file
self._write_data_to_file(result, file_path, data_format)
# Get file size
file_size = self.os.path.getsize(file_path)
# Prepare response
response = {
"file_path": file_path,
"data_format": data_format,
"data_structure": data_structure,
"file_size": file_size,
"created_at": self.datetime.now().isoformat(),
"tool_name": tool_name,
"original_arguments": arguments,
}
# Add metadata if requested
if self.include_metadata:
response["metadata"] = {
"hook_name": self.name,
"hook_type": "FileSaveHook",
"processing_time": self.datetime.now().isoformat(),
"context": context,
}
# Perform auto cleanup if enabled
if self.auto_cleanup:
self._cleanup_old_files()
return response
except Exception as e:
# Return error information instead of failing
return {
"status": "error",
"error": f"Failed to save output to file: {str(e)}",
"original_output": str(result),
"tool_name": tool_name,
"hook_name": self.name,
}
[docs]
def _analyze_data(self, data: Any) -> tuple[str, str]:
"""
Analyze the data to determine its format and structure.
Args:
data (Any): The data to analyze
Returns
tuple[str, str]: (data_format, data_structure)
"""
if isinstance(data, dict):
return "json", f"dict with {len(data)} keys"
elif isinstance(data, list):
return "json", f"list with {len(data)} items"
elif isinstance(data, str):
if data.strip().startswith("{") or data.strip().startswith("["):
return "json", "JSON string"
else:
return "txt", f"text with {len(data)} characters"
elif isinstance(data, (int, float)):
return "json", "numeric value"
elif isinstance(data, bool):
return "json", "boolean value"
else:
return "bin", f"binary data of type {type(data).__name__}"
[docs]
def _write_data_to_file(self, data: Any, file_path: str, data_format: str) -> None:
"""
Write data to file in the appropriate format.
Args:
data (Any): The data to write
file_path (str): Path to the file
data_format (str): Format of the data
"""
if data_format == "json":
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
elif data_format == "txt":
with open(file_path, "w", encoding="utf-8") as f:
f.write(str(data))
else:
# For binary or other formats, write as string
with open(file_path, "w", encoding="utf-8") as f:
f.write(str(data))
[docs]
def _cleanup_old_files(self) -> None:
"""
Clean up old files based on the cleanup_age_hours setting.
"""
if not self.temp_dir:
return
try:
current_time = self.datetime.now()
cutoff_time = current_time - self.timedelta(hours=self.cleanup_age_hours)
for filename in self.os.listdir(self.temp_dir):
if filename.startswith(self.file_prefix):
file_path = self.os.path.join(self.temp_dir, filename)
file_time = self.datetime.fromtimestamp(
self.os.path.getmtime(file_path)
)
if file_time < cutoff_time:
self.os.remove(file_path)
except Exception as e:
# Log error but don't fail the hook
print(f"Warning: Failed to cleanup old files: {e}")