"""
Output Hook System for ToolUniverse
This module provides a comprehensive hook-based output processing system that allows
for intelligent post-processing of tool outputs. The system supports various types
of hooks including summarization, filtering, and transformation hooks.
Key Components:
- HookRule: Defines conditions for when hooks should trigger
- OutputHook: Base class for all output hooks
- SummarizationHook: Specialized hook for output summarization
- HookManager: Manages and coordinates all hooks
The hook system integrates seamlessly with ToolUniverse's existing architecture,
leveraging AgenticTool and ComposeTool for intelligent output processing.
"""
import json
from typing import Dict, Any, List, Optional
from pathlib import Path
[docs]
class HookRule:
"""
Defines rules for when hooks should be triggered.
This class evaluates various conditions to determine if a hook should
be applied to a tool's output. Supports multiple condition types including
output length, content type, and tool-specific criteria.
Args:
conditions (Dict[str, Any]): Dictionary containing condition specifications
Attributes:
conditions (Dict[str, Any]): The condition specifications
"""
[docs]
def __init__(self, conditions: Dict[str, Any]):
"""
Initialize the hook rule with conditions.
Args:
conditions (Dict[str, Any]): Condition specifications including
output_length, content_type, tool_type, etc.
"""
self.conditions = conditions
[docs]
def evaluate(
self,
result: Any,
tool_name: str,
arguments: Dict[str, Any],
context: Dict[str, Any],
) -> bool:
"""
Evaluate whether the rule conditions are met.
Args:
result (Any): The tool output to evaluate
tool_name (str): Name of the tool that produced the output
arguments (Dict[str, Any]): Arguments passed to the tool
context (Dict[str, Any]): Additional context information
Returns:
bool: True if conditions are met, False otherwise
"""
# Evaluate output length conditions
if "output_length" in self.conditions:
result_str = str(result)
length_condition = self.conditions["output_length"]
threshold = length_condition.get("threshold", 5000)
operator = length_condition.get("operator", ">")
if operator == ">":
return len(result_str) > threshold
elif operator == ">=":
return len(result_str) >= threshold
elif operator == "<":
return len(result_str) < threshold
elif operator == "<=":
return len(result_str) <= threshold
# Evaluate content type conditions
if "content_type" in self.conditions:
content_type = self.conditions["content_type"]
if content_type == "json" and isinstance(result, dict):
return True
elif content_type == "text" and isinstance(result, str):
return True
# Evaluate tool type conditions
if "tool_type" in self.conditions:
tool_type = context.get("tool_type", "")
return tool_type == self.conditions["tool_type"]
# Evaluate tool name conditions
if "tool_name" in self.conditions:
return tool_name == self.conditions["tool_name"]
# If no specific conditions are met, return True for general rules
return True
[docs]
class OutputHook:
"""
Base class for all output hooks.
This abstract base class defines the interface that all output hooks must implement.
Hooks are used to process tool outputs after execution, enabling features like
summarization, filtering, transformation, and validation.
Args:
config (Dict[str, Any]): Hook configuration including name, enabled status,
priority, and conditions
Attributes:
config (Dict[str, Any]): Hook configuration
name (str): Name of the hook
enabled (bool): Whether the hook is enabled
priority (int): Hook priority (lower numbers execute first)
rule (HookRule): Rule for when this hook should trigger
"""
[docs]
def __init__(self, config: Dict[str, Any]):
"""
Initialize the output hook with configuration.
Args:
config (Dict[str, Any]): Hook configuration containing:
- name: Hook identifier
- enabled: Whether hook is active
- priority: Execution priority
- conditions: Trigger conditions
"""
self.config = config
self.name = config.get("name", "unnamed_hook")
self.enabled = config.get("enabled", True)
self.priority = config.get("priority", 1)
self.rule = HookRule(config.get("conditions", {}))
[docs]
def should_trigger(
self,
result: Any,
tool_name: str,
arguments: Dict[str, Any],
context: Dict[str, Any],
) -> bool:
"""
Determine if this hook should be triggered for the given output.
Args:
result (Any): The tool output to evaluate
tool_name (str): Name of the tool that produced the output
arguments (Dict[str, Any]): Arguments passed to the tool
context (Dict[str, Any]): Additional context information
Returns:
bool: True if hook should trigger, False otherwise
"""
if not self.enabled:
return False
return self.rule.evaluate(result, tool_name, arguments, context)
[docs]
def process(
self,
result: Any,
tool_name: str,
arguments: Dict[str, Any],
context: Dict[str, Any],
) -> Any:
"""
Process the tool output.
This method must be implemented by subclasses to define the specific
processing logic for the hook.
Args:
result (Any): The tool output to process
tool_name (str): Name of the tool that produced the output
arguments (Dict[str, Any]): Arguments passed to the tool
context (Dict[str, Any]): Additional context information
Returns:
Any: The processed output
Raises:
NotImplementedError: If not implemented by subclass
"""
raise NotImplementedError("Subclasses must implement process method")
[docs]
class SummarizationHook(OutputHook):
"""
Hook for intelligent output summarization using AI.
This hook uses the ToolUniverse's AgenticTool and ComposeTool infrastructure
to provide intelligent summarization of long tool outputs. It supports
chunking large outputs, processing each chunk with AI, and merging results.
Args:
config (Dict[str, Any]): Hook configuration including summarization parameters
tooluniverse: Reference to the ToolUniverse instance
Attributes:
tooluniverse: ToolUniverse instance for tool execution
composer_tool_name (str): Name of the ComposeTool for summarization
chunk_size (int): Size of chunks for processing large outputs
focus_areas (str): Areas to focus on during summarization
max_summary_length (int): Maximum length of final summary
"""
[docs]
def __init__(self, config: Dict[str, Any], tooluniverse):
"""
Initialize the summarization hook.
Args:
config (Dict[str, Any]): Hook configuration
tooluniverse: ToolUniverse instance for executing summarization tools
"""
super().__init__(config)
self.tooluniverse = tooluniverse
hook_config = config.get("hook_config", {})
self.composer_tool_name = hook_config.get(
"composer_tool", "OutputSummarizationComposer"
)
self.chunk_size = hook_config.get("chunk_size", 2000)
self.focus_areas = hook_config.get("focus_areas", "key_findings_and_results")
self.max_summary_length = hook_config.get("max_summary_length", 3000)
# Optional timeout to prevent hangs in composer / LLM calls
# If the composer does not return within this window, we gracefully fall back
self.composer_timeout_sec = hook_config.get("composer_timeout_sec", 20)
[docs]
def process(
self,
result: Any,
tool_name: str,
arguments: Dict[str, Any],
context: Dict[str, Any],
) -> Any:
"""
Execute summarization processing using Compose Summarizer Tool.
This method orchestrates the summarization workflow by:
1. Preparing parameters for the Compose Summarizer Tool
2. Calling the tool through ToolUniverse
3. Processing and returning the summarized result
Args:
result (Any): The tool output to summarize
tool_name (str): Name of the tool that produced the output
arguments (Dict[str, Any]): Arguments passed to the tool
context (Dict[str, Any]): Additional context information
Returns:
Any: The summarized output, or original output if summarization fails
"""
try:
# Debug: basic context
try:
_len = len(str(result))
except Exception:
_len = -1
import sys as _sys
print(
f"[SummarizationHook] process: tool={tool_name}, result_len={_len}, "
f"chunk_size={self.chunk_size}, max_summary_length={self.max_summary_length}",
file=_sys.stderr,
flush=True,
)
# Check if the required tools are available
if (
self.composer_tool_name not in self.tooluniverse.callable_functions
and self.composer_tool_name not in self.tooluniverse.all_tool_dict
):
print(
f"❌ SummarizationHook: {self.composer_tool_name} tool is not available."
)
print(
" This usually means the output_summarization tools are not loaded."
)
print(" Returning original output without summarization.")
return result
# Prepare parameters for Compose Summarizer Tool
composer_args = {
"tool_output": str(result),
"query_context": self._extract_query_context(context),
"tool_name": tool_name,
"chunk_size": self.chunk_size,
"focus_areas": self.focus_areas,
"max_summary_length": self.max_summary_length,
}
# Call Compose Summarizer Tool through ToolUniverse
print(
f"[SummarizationHook] calling composer tool: {self.composer_tool_name} (timeout={self.composer_timeout_sec}s)",
file=_sys.stderr,
flush=True,
)
# Run composer with timeout to avoid hangs
try:
from concurrent.futures import (
ThreadPoolExecutor,
)
def _call_composer():
return self.tooluniverse.run_one_function(
{"name": self.composer_tool_name, "arguments": composer_args}
)
with ThreadPoolExecutor(max_workers=1) as _pool:
_future = _pool.submit(_call_composer)
composer_result = _future.result(timeout=self.composer_timeout_sec)
except Exception as _e_timeout:
# Timeout or execution error; log and fall back to original output
print(
f"[SummarizationHook] composer execution failed/timeout: {_e_timeout}",
file=_sys.stderr,
flush=True,
)
return result
# Debug: show composer result meta
try:
if isinstance(composer_result, dict):
success = composer_result.get("success", False)
summary_len = len(composer_result.get("summary", ""))
print(
f"[SummarizationHook] composer_result: success={success} summary_len={summary_len}",
file=_sys.stderr,
flush=True,
)
except Exception as _e_dbg:
print(
f"[SummarizationHook] debug error inspecting composer_result: {_e_dbg}",
file=_sys.stderr,
flush=True,
)
# Process Compose Tool result
if isinstance(composer_result, dict) and composer_result.get("success"):
return composer_result.get("summary", result)
elif isinstance(composer_result, str):
return composer_result
else:
print(
f"Warning: Compose Summarizer Tool returned unexpected result: {composer_result}"
)
return result
except Exception as e:
error_msg = str(e)
import sys as _sys
print(
f"Error in summarization hook: {error_msg}",
file=_sys.stderr,
flush=True,
)
# Check if the error is due to missing tools
if "not found" in error_msg.lower() or "ToolOutputSummarizer" in error_msg:
print(
"❌ SummarizationHook: Required summarization tools are not available."
)
print(" Please ensure the SMCP server is started with hooks enabled.")
return result
def _extract_query_context(self, context: Dict[str, Any]) -> str:
"""
Extract query context from execution context.
This method attempts to identify the original user query or intent
from the context information to provide better summarization.
Args:
context (Dict[str, Any]): Execution context containing arguments and metadata
Returns:
str: Extracted query context or fallback description
"""
arguments = context.get("arguments", {})
# Common query parameter names
query_keys = ["query", "question", "input", "text", "search_term", "prompt"]
for key in query_keys:
if key in arguments:
return str(arguments[key])
# If no explicit query found, return tool name as context
return f"Tool execution: {context.get('tool_name', 'unknown')}"
[docs]
class HookManager:
"""
Manages and coordinates all output hooks.
The HookManager is responsible for loading hook configurations, creating
hook instances, and applying hooks to tool outputs. It provides a unified
interface for hook management and supports dynamic configuration updates.
Args:
config (Dict[str, Any]): Hook manager configuration
tooluniverse: Reference to the ToolUniverse instance
Attributes:
config (Dict[str, Any]): Hook manager configuration
tooluniverse: ToolUniverse instance for tool execution
hooks (List[OutputHook]): List of loaded hook instances
enabled (bool): Whether hook processing is enabled
config_path (str): Path to hook configuration file
"""
[docs]
def __init__(self, config: Dict[str, Any], tooluniverse):
"""
Initialize the hook manager.
Args:
config (Dict[str, Any]): Configuration for hook manager
tooluniverse: ToolUniverse instance for executing tools
"""
self.config = config
self.tooluniverse = tooluniverse
self.hooks: List[OutputHook] = []
self.enabled = True
self.config_path = config.get("config_path", "template/hook_config.json")
self._pending_tools_to_load: List[str] = []
self._load_hook_config()
# Validate LLM API keys before loading hooks
if not self._validate_llm_api_keys():
print("⚠️ Warning: LLM API keys not available. Hooks will be disabled.")
print(
" To enable hooks, please set AZURE_OPENAI_API_KEY environment variable."
)
self.enabled = False
return
self._load_hooks()
[docs]
def apply_hooks(
self,
result: Any,
tool_name: str,
arguments: Dict[str, Any],
context: Dict[str, Any],
) -> Any:
"""
Apply all applicable hooks to the tool output.
This method iterates through all loaded hooks, checks if they should
be applied to the current output, and processes the output through
each applicable hook in priority order.
Args:
result (Any): The tool output to process
tool_name (str): Name of the tool that produced the output
arguments (Dict[str, Any]): Arguments passed to the tool
context (Dict[str, Any]): Additional context information
Returns:
Any: The processed output after applying all applicable hooks
"""
if not self.enabled:
return result
# Load pending tools if ToolUniverse is now ready
self._load_pending_tools()
# Prevent recursive hook processing
if self._is_hook_tool(tool_name):
return result
# Sort hooks by priority (lower numbers execute first)
sorted_hooks = sorted(self.hooks, key=lambda h: h.priority)
for hook in sorted_hooks:
if not hook.enabled:
continue
# Check if hook is applicable to current tool
if self._is_hook_applicable(hook, tool_name, context):
if hook.should_trigger(result, tool_name, arguments, context):
print(f"🔧 Applying hook: {hook.name} for tool: {tool_name}")
result = hook.process(result, tool_name, arguments, context)
return result
def _validate_llm_api_keys(self) -> bool:
"""
Validate that LLM API keys are available for hook tools.
Returns:
bool: True if API keys are available, False otherwise
"""
from .agentic_tool import AgenticTool
if AgenticTool.has_any_api_keys():
print("✅ LLM API keys validated successfully")
return True
else:
print("❌ LLM API key validation failed: No API keys available")
print(" To enable hooks, please set API key environment variables.")
return False
[docs]
def enable_hook(self, hook_name: str):
"""
Enable a specific hook by name.
Args:
hook_name (str): Name of the hook to enable
"""
hook = self.get_hook(hook_name)
if hook:
hook.enabled = True
print(f"✅ Enabled hook: {hook_name}")
else:
print(f"❌ Hook not found: {hook_name}")
[docs]
def disable_hook(self, hook_name: str):
"""
Disable a specific hook by name.
Args:
hook_name (str): Name of the hook to disable
"""
hook = self.get_hook(hook_name)
if hook:
hook.enabled = False
print(f"❌ Disabled hook: {hook_name}")
else:
print(f"❌ Hook not found: {hook_name}")
[docs]
def toggle_hooks(self, enabled: bool):
"""
Enable or disable all hooks globally.
Args:
enabled (bool): True to enable all hooks, False to disable
"""
self.enabled = enabled
status = "enabled" if enabled else "disabled"
print(f"🔧 Hooks {status}")
[docs]
def reload_config(self, config_path: Optional[str] = None):
"""
Reload hook configuration from file.
Args:
config_path (Optional[str]): Path to configuration file.
If None, uses the current config_path
"""
if config_path:
self.config_path = config_path
self._load_hook_config()
self._load_hooks()
print("🔄 Reloaded hook configuration")
[docs]
def get_hook(self, hook_name: str) -> Optional[OutputHook]:
"""
Get a hook instance by name.
Args:
hook_name (str): Name of the hook to retrieve
Returns:
Optional[OutputHook]: Hook instance if found, None otherwise
"""
for hook in self.hooks:
if hook.name == hook_name:
return hook
return None
def _load_hook_config(self):
"""
Load hook configuration from file.
This method attempts to load the hook configuration from the specified
file path, handling both package resources and file system paths.
If the config is already provided and not empty, it uses that instead.
"""
# If config is already provided and not empty, use it
if self.config and (
("hooks" in self.config)
or ("tool_specific_hooks" in self.config)
or ("category_hooks" in self.config)
):
return
try:
config_file = self._get_config_file_path()
if hasattr(config_file, "read_text"):
content = config_file.read_text(encoding="utf-8")
else:
with open(config_file, "r", encoding="utf-8") as f:
content = f.read()
self.config = json.loads(content)
except Exception as e:
print(f"Warning: Could not load hook config: {e}")
if not self.config:
self.config = {}
def _get_config_file_path(self):
"""
Get the path to the hook configuration file.
Returns:
Path: Path to the configuration file
"""
try:
import importlib.resources as pkg_resources
except ImportError:
import importlib_resources as pkg_resources
try:
data_files = pkg_resources.files("tooluniverse.template")
config_file = data_files / "hook_config.json"
return config_file
except Exception:
# Fallback to file-based path resolution
current_dir = Path(__file__).parent
config_file = current_dir / "template" / "hook_config.json"
return config_file
def _load_hooks(self):
"""
Load hook configurations and create hook instances.
This method processes the configuration and creates appropriate
hook instances for global, tool-specific, and category-specific hooks.
It also automatically loads any tools required by the hooks.
"""
self.hooks = []
# Collect all hook configs first to determine required tools
all_hook_configs = []
# Load global hooks
global_hooks = self.config.get("hooks", [])
for hook_config in global_hooks:
all_hook_configs.append(hook_config)
# Load tool-specific hooks
tool_specific_hooks = self.config.get("tool_specific_hooks", {})
for tool_name, tool_hook_config in tool_specific_hooks.items():
if tool_hook_config.get("enabled", True):
tool_hooks = tool_hook_config.get("hooks", [])
for hook_config in tool_hooks:
hook_config["tool_name"] = tool_name
all_hook_configs.append(hook_config)
# Load category-specific hooks
category_hooks = self.config.get("category_hooks", {})
for category_name, category_hook_config in category_hooks.items():
if category_hook_config.get("enabled", True):
category_hooks_list = category_hook_config.get("hooks", [])
for hook_config in category_hooks_list:
hook_config["category"] = category_name
all_hook_configs.append(hook_config)
# Auto-load required tools for hooks
self._auto_load_hook_tools(all_hook_configs)
# Ensure hook tools are loaded
self._ensure_hook_tools_loaded()
# Create hook instances
for hook_config in all_hook_configs:
hook = self._create_hook_instance(hook_config)
if hook:
self.hooks.append(hook)
def _auto_load_hook_tools(self, hook_configs: List[Dict[str, Any]]):
"""
Automatically load tools required by hooks.
This method analyzes hook configurations to determine which tools
are needed and automatically loads them into the ToolUniverse.
Args:
hook_configs (List[Dict[str, Any]]): List of hook configurations
"""
required_tools = set()
for hook_config in hook_configs:
hook_type = hook_config.get("type", "SummarizationHook")
hook_config_section = hook_config.get("hook_config", {})
# Determine required tools based on hook type
if hook_type == "SummarizationHook":
composer_tool = hook_config_section.get(
"composer_tool", "OutputSummarizationComposer"
)
required_tools.add(composer_tool)
# Also need the agentic tool for summarization
required_tools.add("ToolOutputSummarizer")
elif hook_type == "FilteringHook":
# Add filtering-related tools if any
pass
elif hook_type == "FormattingHook":
# Add formatting-related tools if any
pass
elif hook_type == "ValidationHook":
# Add validation-related tools if any
pass
elif hook_type == "LoggingHook":
# Add logging-related tools if any
pass
# Load required tools
if required_tools:
tools_to_load = []
for tool in required_tools:
# Map tool names to their categories
if tool in ["OutputSummarizationComposer", "ToolOutputSummarizer"]:
tools_to_load.append("output_summarization")
# Add more mappings as needed
if tools_to_load:
try:
# Ensure ComposeTool is available
from .compose_tool import ComposeTool
from .tool_registry import register_external_tool
register_external_tool("ComposeTool", ComposeTool)
# Check if ToolUniverse is fully initialized
if hasattr(self.tooluniverse, "all_tools"):
# Load the tools and verify they were loaded
self.tooluniverse.load_tools(tools_to_load)
# Verify that the required tools are actually available
missing_tools = []
for tool in required_tools:
if (
tool not in self.tooluniverse.callable_functions
and tool not in self.tooluniverse.all_tool_dict
):
missing_tools.append(tool)
if missing_tools:
print(
f"⚠️ Warning: Some hook tools could not be loaded: {missing_tools}"
)
print(" This may cause summarization hooks to fail.")
else:
print(
f"🔧 Auto-loaded hook tools: {', '.join(tools_to_load)}"
)
else:
# Store tools to load later when ToolUniverse is ready
self._pending_tools_to_load = tools_to_load
print(
f"🔧 Hook tools queued for loading: {', '.join(tools_to_load)}"
)
except Exception as e:
print(f"⚠️ Warning: Could not auto-load hook tools: {e}")
print(" This will cause summarization hooks to fail.")
def _ensure_hook_tools_loaded(self):
"""
Ensure that tools required by hooks are loaded.
This method is called during HookManager initialization to make sure that
the necessary tools (like output_summarization tools) are available.
"""
try:
# Ensure ComposeTool is available
from .compose_tool import ComposeTool
from .tool_registry import register_external_tool
register_external_tool("ComposeTool", ComposeTool)
# Load output_summarization tools if not already loaded
if (
not hasattr(self.tooluniverse, "tool_category_dicts")
or "output_summarization" not in self.tooluniverse.tool_category_dicts
):
print("🔧 Loading output_summarization tools for hooks")
self.tooluniverse.load_tools(["output_summarization"])
# Verify the tools were loaded
missing_tools = []
required_tools = ["ToolOutputSummarizer", "OutputSummarizationComposer"]
for tool in required_tools:
if (
hasattr(self.tooluniverse, "callable_functions")
and tool not in self.tooluniverse.callable_functions
and hasattr(self.tooluniverse, "all_tool_dict")
and tool not in self.tooluniverse.all_tool_dict
):
missing_tools.append(tool)
if missing_tools:
print(
f"⚠️ Warning: Some hook tools could not be loaded: {missing_tools}"
)
print(" This may cause summarization hooks to fail")
else:
print(f"✅ Hook tools loaded successfully: {required_tools}")
else:
print("🔧 Output_summarization tools already loaded")
except Exception as e:
print(f"❌ Error loading hook tools: {e}")
print(" This will cause summarization hooks to fail")
def _load_pending_tools(self):
"""
Load any pending tools that were queued during initialization.
This method is called when hooks are applied to ensure that any tools
that couldn't be loaded during HookManager initialization are loaded
once the ToolUniverse is fully ready.
"""
if self._pending_tools_to_load and hasattr(self.tooluniverse, "all_tools"):
try:
self.tooluniverse.load_tools(self._pending_tools_to_load)
print(
f"🔧 Loaded pending hook tools: {', '.join(self._pending_tools_to_load)}"
)
self._pending_tools_to_load = [] # Clear the pending list
except Exception as e:
print(f"⚠️ Warning: Could not load pending hook tools: {e}")
def _is_hook_tool(self, tool_name: str) -> bool:
"""
Check if a tool is a hook-related tool that should not be processed by hooks.
This prevents recursive hook processing where hook tools (like ToolOutputSummarizer)
produce output that would trigger more hook processing.
Args:
tool_name (str): Name of the tool to check
Returns:
bool: True if the tool is a hook tool and should be excluded from hook processing
"""
hook_tool_names = [
"ToolOutputSummarizer",
"OutputSummarizationComposer",
# Add more hook tool names as needed
]
return tool_name in hook_tool_names
def _create_hook_instance(
self, hook_config: Dict[str, Any]
) -> Optional[OutputHook]:
"""
Create a hook instance based on configuration.
This method creates hook instances and applies hook type-specific defaults
from the configuration before initializing the hook.
Args:
hook_config (Dict[str, Any]): Hook configuration
Returns:
Optional[OutputHook]: Created hook instance or None if type not supported
"""
hook_type = hook_config.get("type", "SummarizationHook")
# Apply hook type-specific defaults
enhanced_config = self._apply_hook_type_defaults(hook_config)
if hook_type == "SummarizationHook":
return SummarizationHook(enhanced_config, self.tooluniverse)
elif hook_type == "FileSaveHook":
# Merge hook_config with the main config for FileSaveHook
file_save_config = enhanced_config.copy()
file_save_config.update(enhanced_config.get("hook_config", {}))
return FileSaveHook(file_save_config)
else:
print(f"Unknown hook type: {hook_type}")
return None
def _apply_hook_type_defaults(self, hook_config: Dict[str, Any]) -> Dict[str, Any]:
"""
Apply hook type-specific default values to hook configuration.
This method merges hook type defaults with individual hook configuration,
ensuring that each hook type gets its appropriate default values.
Args:
hook_config (Dict[str, Any]): Original hook configuration
Returns:
Dict[str, Any]: Enhanced configuration with defaults applied
"""
hook_type = hook_config.get("type", "SummarizationHook")
# Get hook type defaults from configuration
hook_type_defaults = self.config.get("hook_type_defaults", {}).get(
hook_type, {}
)
# Create enhanced configuration
enhanced_config = hook_config.copy()
# Apply defaults to hook_config if not already specified
if "hook_config" not in enhanced_config:
enhanced_config["hook_config"] = {}
hook_config_section = enhanced_config["hook_config"]
# Apply defaults for each hook type
if hook_type == "SummarizationHook":
defaults = {
"composer_tool": "OutputSummarizationComposer",
"chunk_size": hook_type_defaults.get("default_chunk_size", 2000),
"focus_areas": hook_type_defaults.get(
"default_focus_areas", "key_findings_and_results"
),
"max_summary_length": hook_type_defaults.get(
"default_max_summary_length", 3000
),
}
elif hook_type == "FilteringHook":
defaults = {
"replacement_text": hook_type_defaults.get(
"default_replacement_text", "[REDACTED]"
),
"preserve_structure": hook_type_defaults.get(
"default_preserve_structure", True
),
"log_filtered_items": hook_type_defaults.get(
"default_log_filtered_items", False
),
}
elif hook_type == "FormattingHook":
defaults = {
"indent_size": hook_type_defaults.get("default_indent_size", 2),
"sort_keys": hook_type_defaults.get("default_sort_keys", True),
"pretty_print": hook_type_defaults.get("default_pretty_print", True),
"max_line_length": hook_type_defaults.get(
"default_max_line_length", 100
),
}
elif hook_type == "ValidationHook":
defaults = {
"strict_mode": hook_type_defaults.get("default_strict_mode", False),
"error_action": hook_type_defaults.get("default_error_action", "warn"),
}
elif hook_type == "LoggingHook":
defaults = {
"log_level": hook_type_defaults.get("default_log_level", "INFO"),
"log_format": hook_type_defaults.get("default_log_format", "simple"),
"max_log_size": hook_type_defaults.get("default_max_log_size", 1000),
}
elif hook_type == "FileSaveHook":
defaults = {
"temp_dir": hook_type_defaults.get("default_temp_dir", None),
"file_prefix": hook_type_defaults.get(
"default_file_prefix", "tool_output"
),
"include_metadata": hook_type_defaults.get(
"default_include_metadata", True
),
"auto_cleanup": hook_type_defaults.get("default_auto_cleanup", False),
"cleanup_age_hours": hook_type_defaults.get(
"default_cleanup_age_hours", 24
),
}
else:
defaults = {}
# Apply defaults only if not already specified
for key, default_value in defaults.items():
if key not in hook_config_section:
hook_config_section[key] = default_value
return enhanced_config
def _is_hook_applicable(
self, hook: OutputHook, tool_name: str, context: Dict[str, Any]
) -> bool:
"""
Check if a hook is applicable to the current tool.
Args:
hook (OutputHook): Hook instance to check
tool_name (str): Name of the current tool
context (Dict[str, Any]): Execution context
Returns:
bool: True if hook is applicable, False otherwise
"""
# Check tool-specific hooks
if "tool_name" in hook.config:
return hook.config["tool_name"] == tool_name
# Check category-specific hooks
if "category" in hook.config:
# This would need to be implemented based on actual tool categorization
# For now, return True to apply category hooks to all tools
return True
# Global hooks apply to all tools
return True
[docs]
class FileSaveHook(OutputHook):
"""
Hook that saves tool outputs to temporary files and returns file information.
This hook saves the tool output to a temporary file and returns information
about the file path, data format, and data structure instead of the original output.
This is useful for handling large outputs or when you need to process outputs
as files rather than in-memory data.
Configuration options:
- temp_dir: Directory to save temporary files (default: system temp)
- file_prefix: Prefix for generated filenames (default: 'tool_output')
- include_metadata: Whether to include metadata in the response (default: True)
- auto_cleanup: Whether to automatically clean up old files (default: False)
- cleanup_age_hours: Age in hours for auto cleanup (default: 24)
"""
[docs]
def __init__(self, config: Dict[str, Any]):
"""
Initialize the FileSaveHook.
Args:
config (Dict[str, Any]): Hook configuration including:
- name: Hook name
- temp_dir: Directory for temporary files
- file_prefix: Prefix for filenames
- include_metadata: Include metadata flag
- auto_cleanup: Auto cleanup flag
- cleanup_age_hours: Cleanup age in hours
"""
super().__init__(config)
# Set default configuration
self.temp_dir = config.get("temp_dir", None)
self.file_prefix = config.get("file_prefix", "tool_output")
self.include_metadata = config.get("include_metadata", True)
self.auto_cleanup = config.get("auto_cleanup", False)
self.cleanup_age_hours = config.get("cleanup_age_hours", 24)
# Import required modules
import tempfile
import os
from datetime import datetime, timedelta
self.tempfile = tempfile
self.os = os
self.datetime = datetime
self.timedelta = timedelta
# Create temp directory if specified
if self.temp_dir:
self.os.makedirs(self.temp_dir, exist_ok=True)
[docs]
def process(
self,
result: Any,
tool_name: str,
arguments: Dict[str, Any],
context: Dict[str, Any],
) -> Dict[str, Any]:
"""
Process the tool output by saving it to a temporary file.
Args:
result (Any): The tool output to process
tool_name (str): Name of the tool that produced the output
arguments (Dict[str, Any]): Arguments passed to the tool
context (Dict[str, Any]): Execution context
Returns:
Dict[str, Any]: Dictionary containing file information:
- file_path: Path to the saved file
- data_format: Format of the data (json, text, binary, etc.)
- data_structure: Structure information about the data
- file_size: Size of the file in bytes
- created_at: Timestamp when file was created
- metadata: Additional metadata (if include_metadata is True)
"""
try:
# Determine data format and structure
data_format, data_structure = self._analyze_data(result)
# Generate filename
timestamp = self.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{self.file_prefix}_{tool_name}_{timestamp}.{data_format}"
# Save to temporary file
if self.temp_dir:
file_path = self.os.path.join(self.temp_dir, filename)
else:
# Use system temp directory
temp_fd, file_path = self.tempfile.mkstemp(
suffix=f"_{filename}", prefix=self.file_prefix, dir=self.temp_dir
)
self.os.close(temp_fd)
# Write data to file
self._write_data_to_file(result, file_path, data_format)
# Get file size
file_size = self.os.path.getsize(file_path)
# Prepare response
response = {
"file_path": file_path,
"data_format": data_format,
"data_structure": data_structure,
"file_size": file_size,
"created_at": self.datetime.now().isoformat(),
"tool_name": tool_name,
"original_arguments": arguments,
}
# Add metadata if requested
if self.include_metadata:
response["metadata"] = {
"hook_name": self.name,
"hook_type": "FileSaveHook",
"processing_time": self.datetime.now().isoformat(),
"context": context,
}
# Perform auto cleanup if enabled
if self.auto_cleanup:
self._cleanup_old_files()
return response
except Exception as e:
# Return error information instead of failing
return {
"error": f"Failed to save output to file: {str(e)}",
"original_output": str(result),
"tool_name": tool_name,
"hook_name": self.name,
}
def _analyze_data(self, data: Any) -> tuple[str, str]:
"""
Analyze the data to determine its format and structure.
Args:
data (Any): The data to analyze
Returns:
tuple[str, str]: (data_format, data_structure)
"""
if isinstance(data, dict):
return "json", f"dict with {len(data)} keys"
elif isinstance(data, list):
return "json", f"list with {len(data)} items"
elif isinstance(data, str):
if data.strip().startswith("{") or data.strip().startswith("["):
return "json", "JSON string"
else:
return "txt", f"text with {len(data)} characters"
elif isinstance(data, (int, float)):
return "json", "numeric value"
elif isinstance(data, bool):
return "json", "boolean value"
else:
return "bin", f"binary data of type {type(data).__name__}"
def _write_data_to_file(self, data: Any, file_path: str, data_format: str) -> None:
"""
Write data to file in the appropriate format.
Args:
data (Any): The data to write
file_path (str): Path to the file
data_format (str): Format of the data
"""
if data_format == "json":
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
elif data_format == "txt":
with open(file_path, "w", encoding="utf-8") as f:
f.write(str(data))
else:
# For binary or other formats, write as string
with open(file_path, "w", encoding="utf-8") as f:
f.write(str(data))
def _cleanup_old_files(self) -> None:
"""
Clean up old files based on the cleanup_age_hours setting.
"""
if not self.temp_dir:
return
try:
current_time = self.datetime.now()
cutoff_time = current_time - self.timedelta(hours=self.cleanup_age_hours)
for filename in self.os.listdir(self.temp_dir):
if filename.startswith(self.file_prefix):
file_path = self.os.path.join(self.temp_dir, filename)
file_time = self.datetime.fromtimestamp(
self.os.path.getmtime(file_path)
)
if file_time < cutoff_time:
self.os.remove(file_path)
except Exception as e:
# Log error but don't fail the hook
print(f"Warning: Failed to cleanup old files: {e}")