"""
Python Code Execution Tools for ToolUniverse
This module provides two specialized tools for executing Python code:
1. python_code_executor - Execute Python code snippets safely in sandboxed environment
2. python_script_runner - Run Python script files in isolated subprocess
"""
import ast
import io
import os
import signal
import subprocess
import sys
import time
import traceback
from typing import Any, Dict, List, Optional
from .base_tool import BaseTool
from .tool_registry import register_tool
[docs]
class BasePythonExecutor:
"""Base class for Python execution tools with shared security features."""
# Safe builtins (whitelist approach)
SAFE_BUILTINS = {
"print",
"len",
"range",
"enumerate",
"zip",
"map",
"filter",
"sorted",
"sum",
"min",
"max",
"abs",
"round",
"int",
"float",
"str",
"bool",
"list",
"dict",
"set",
"tuple",
"isinstance",
"any",
"all",
"reversed",
"slice",
"type",
"getattr",
"setattr",
"hasattr",
"callable",
"__import__", # Needed for import statements
}
# Default allowed modules
DEFAULT_ALLOWED_MODULES = {
"math",
"json",
"datetime",
"collections",
"itertools",
"re",
"typing",
"dataclasses",
"decimal",
"fractions",
"statistics",
"random",
# Mathematical computing libraries
"sympy",
"numpy",
"scipy",
"matplotlib",
}
# Forbidden AST node types and their dangerous attributes
FORBIDDEN_AST_NODES = {
"Import": ["os", "sys", "subprocess", "socket", "urllib", "requests", "http"],
"Call": ["open", "eval", "exec", "compile", "__import__", "input", "raw_input"],
"Attribute": ["__import__", "open", "file"],
}
[docs]
def __init__(self, tool_config: Dict[str, Any]):
"""Initialize the executor with tool configuration."""
self.tool_config = tool_config
self.allowed_modules = set(self.DEFAULT_ALLOWED_MODULES)
# Add custom allowed modules if specified
if "allowed_imports" in tool_config:
self.allowed_modules.update(tool_config["allowed_imports"])
[docs]
def _check_ast_safety(self, code: str) -> tuple[bool, List[str]]:
"""
Check code AST for dangerous operations.
Returns:
(is_safe, warnings)
"""
warnings = []
try:
tree = ast.parse(code)
except SyntaxError as e:
return False, [f"Syntax error: {e.msg} at line {e.lineno}"]
for node in ast.walk(tree):
# Check for forbidden imports
if isinstance(node, ast.Import):
for alias in node.names:
# Check if import is forbidden AND not explicitly allowed
if (
alias.name in self.FORBIDDEN_AST_NODES["Import"]
and alias.name not in self.allowed_modules
):
warnings.append(f"Forbidden import: {alias.name}")
# Check for forbidden function calls
elif isinstance(node, ast.Call):
if isinstance(node.func, ast.Name):
if node.func.id in self.FORBIDDEN_AST_NODES["Call"]:
warnings.append(f"Forbidden function call: {node.func.id}")
elif isinstance(node.func, ast.Attribute):
if node.func.attr in self.FORBIDDEN_AST_NODES["Call"]:
warnings.append(f"Forbidden method call: {node.func.attr}")
# Check for forbidden attribute access
elif isinstance(node, ast.Attribute):
if node.attr in self.FORBIDDEN_AST_NODES["Attribute"]:
warnings.append(f"Forbidden attribute access: {node.attr}")
return len(warnings) == 0, warnings
[docs]
def _create_safe_globals(
self, additional_vars: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Create a safe globals dictionary with restricted builtins."""
# Create restricted builtins
safe_builtins = {}
for name in self.SAFE_BUILTINS:
if hasattr(__builtins__, name):
safe_builtins[name] = getattr(__builtins__, name)
elif hasattr(__builtins__, "__dict__") and name in __builtins__.__dict__:
safe_builtins[name] = __builtins__.__dict__[name]
else:
# Try to get from builtins module directly
try:
import builtins
if hasattr(builtins, name):
safe_builtins[name] = getattr(builtins, name)
except ImportError:
pass
# Create safe __import__ function
def safe_import(name, globals=None, locals=None, fromlist=(), level=0):
"""Safe import function that only allows pre-approved modules."""
if name in self.allowed_modules:
return __import__(name, globals, locals, fromlist, level)
else:
raise ImportError(
f"Module '{name}' is not allowed. Allowed modules: {list(self.allowed_modules)}"
)
safe_builtins["__import__"] = safe_import
# Pre-import allowed modules
safe_modules = {}
for module_name in self.allowed_modules:
try:
safe_modules[module_name] = __import__(module_name)
except ImportError:
pass # Skip modules that can't be imported
globals_dict = {"__builtins__": safe_builtins, **safe_modules}
# Add additional variables
if additional_vars:
globals_dict.update(additional_vars)
return globals_dict
[docs]
def _capture_output(self, func, *args, **kwargs):
"""Capture stdout and stderr during function execution."""
old_stdout = sys.stdout
old_stderr = sys.stderr
stdout_capture = io.StringIO()
stderr_capture = io.StringIO()
sys.stdout = stdout_capture
sys.stderr = stderr_capture
try:
result = func(*args, **kwargs)
stdout_content = stdout_capture.getvalue()
stderr_content = stderr_capture.getvalue()
return result, stdout_content, stderr_content
finally:
sys.stdout = old_stdout
sys.stderr = old_stderr
[docs]
def _handle_timeout(self, signum, frame):
"""Handle execution timeout."""
raise TimeoutError("Code execution timed out")
[docs]
def _execute_with_timeout(self, func, timeout_seconds: int, *args, **kwargs):
"""Execute function with timeout using signal or threading."""
import threading
# Check if we're in the main thread
is_main_thread = threading.current_thread() is threading.main_thread()
# Use threading timeout if not in main thread or on Windows
if not is_main_thread or not hasattr(signal, "SIGALRM"):
# Use threading timeout (works in all threads)
result_container = [None]
exception_container = [None]
def target():
try:
result_container[0] = func(*args, **kwargs)
except Exception as e:
exception_container[0] = e
thread = threading.Thread(target=target)
thread.daemon = True
thread.start()
thread.join(timeout_seconds)
if thread.is_alive():
raise TimeoutError("Code execution timed out")
if exception_container[0]:
raise exception_container[0]
return result_container[0]
# Use signal timeout only in main thread on Unix systems
else:
try:
old_handler = signal.signal(signal.SIGALRM, self._handle_timeout)
signal.alarm(timeout_seconds)
try:
result = func(*args, **kwargs)
return result
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
except (ValueError, AttributeError):
# Fallback to threading if signal fails for any reason
result_container = [None]
exception_container = [None]
def target():
try:
result_container[0] = func(*args, **kwargs)
except Exception as e:
exception_container[0] = e
thread = threading.Thread(target=target)
thread.daemon = True
thread.start()
thread.join(timeout_seconds)
if thread.is_alive():
raise TimeoutError("Code execution timed out")
if exception_container[0]:
raise exception_container[0]
return result_container[0]
[docs]
def _get_package_to_install(self, package: str) -> str:
"""Get the actual package name to install (parent package for submodules)"""
if "." in package:
# For submodules like 'keggtools.keggrest', install the parent package 'keggtools'
return package.split(".")[0]
return package
[docs]
def _check_and_install_dependencies(
self,
dependencies: List[str],
auto_install: bool = False,
require_confirmation: bool = True,
) -> Dict[str, Any]:
"""Check and optionally install missing dependencies with user confirmation."""
if not dependencies:
return {"success": True, "message": "No dependencies to check"}
missing_packages = []
installed_packages = []
print(f"📦 Checking dependencies: {dependencies}")
for package in dependencies:
# Try multiple import strategies
import_success = False
# Strategy 1: Direct package name
try:
__import__(package.replace("-", "_"))
print(f" ✅ {package} is installed (direct import)")
import_success = True
except ImportError:
pass
# Strategy 2: Try common submodule patterns
if not import_success:
patterns = [
package.replace("-", "_"),
package.replace("-", ""),
package.split("-")[0], # For packages like 'keggtools' -> 'kegg'
]
for pattern in patterns:
try:
__import__(pattern)
print(f" ✅ {package} is installed (as {pattern})")
import_success = True
break
except ImportError:
continue
# Strategy 3: Check if it's a submodule (e.g., keggtools.api)
if not import_success and "." in package:
try:
__import__(package)
print(f" ✅ {package} is installed (submodule)")
import_success = True
except ImportError:
pass
# Strategy 4: Check parent package for submodules
if not import_success and "." in package:
parent_package = package.split(".")[0]
try:
parent_module = __import__(parent_package)
# Try to access the submodule
submodule_name = package.split(".")[1]
if hasattr(parent_module, submodule_name):
print(
f" ✅ {package} is available (submodule of {parent_package})"
)
import_success = True
else:
# Try importing the submodule directly
__import__(package)
print(f" ✅ {package} is installed (submodule)")
import_success = True
except ImportError:
pass
if not import_success:
print(f" ❌ {package} is not installed")
missing_packages.append(package)
if not missing_packages:
return {"success": True, "message": "All dependencies are available"}
print(f"\n⚠️ Missing packages: {missing_packages}")
# Get packages to actually install (parent packages for submodules)
packages_to_install = [
self._get_package_to_install(pkg) for pkg in missing_packages
]
packages_to_install = list(set(packages_to_install)) # Remove duplicates
# Handle missing packages
if not auto_install:
if require_confirmation:
print("\n🔐 Security Notice:")
print(
f" The following packages need to be installed: {packages_to_install}"
)
print(f" This will run: pip install {' '.join(packages_to_install)}")
print(" ⚠️ Only install packages from trusted sources!")
# In a real implementation, this would prompt the user
# For now, we'll return a message asking for confirmation
return {
"success": False,
"requires_confirmation": True,
"missing_packages": missing_packages,
"packages_to_install": packages_to_install,
"install_command": f"pip install {' '.join(packages_to_install)}",
"message": "User confirmation required for package installation",
}
else:
return {
"success": False,
"missing_packages": missing_packages,
"packages_to_install": packages_to_install,
"message": "Missing dependencies detected, auto-install disabled",
}
# Auto-install missing packages
print("💿 Installing missing packages...")
for package_to_install in packages_to_install:
try:
print(f" 📥 Installing {package_to_install}...")
result = subprocess.run(
[sys.executable, "-m", "pip", "install", package_to_install],
capture_output=True,
text=True,
timeout=300,
)
if result.returncode == 0:
print(f" ✅ Successfully installed {package_to_install}")
installed_packages.append(package_to_install)
# Verify installation
try:
__import__(package_to_install.replace("-", "_"))
print(f" ✅ {package_to_install} import verified")
except ImportError:
print(
f" ⚠️ {package_to_install} installed but import may need different name"
)
else:
print(
f" ❌ Failed to install {package_to_install}: {result.stderr}"
)
return {
"success": False,
"error": f"Failed to install {package_to_install}: {result.stderr}",
"installed_packages": installed_packages,
}
except Exception as e:
print(f" ❌ Error installing {package_to_install}: {e}")
return {
"success": False,
"error": f"Error installing {package_to_install}: {e}",
"installed_packages": installed_packages,
}
print("✅ All dependencies installed successfully")
return {
"success": True,
"installed_packages": installed_packages,
"message": f"Successfully installed {len(installed_packages)} packages",
}
[docs]
@register_tool("PythonCodeExecutor")
class PythonCodeExecutor(BasePythonExecutor, BaseTool):
"""Execute Python code snippets safely in sandboxed environment."""
[docs]
def __init__(self, tool_config: Dict[str, Any]):
BasePythonExecutor.__init__(self, tool_config)
BaseTool.__init__(self, tool_config)
[docs]
def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute Python code snippet with safety checks and timeout."""
try:
# Extract parameters
code = arguments.get("code", "")
if not code:
return self._format_error_response(
ValueError("Code parameter is required"),
"ValueError",
execution_time=0,
)
timeout = arguments.get("timeout", 30)
timeout = min(max(timeout, 1), 300) # Clamp between 1-300 seconds
return_variable = arguments.get("return_variable", "result")
additional_vars = arguments.get("arguments", {})
# Update allowed modules if specified
if "allowed_imports" in arguments:
self.allowed_modules.update(arguments["allowed_imports"])
# Check AST safety
is_safe, ast_warnings = self._check_ast_safety(code)
if not is_safe:
return self._format_error_response(
ValueError(
f"Code contains forbidden operations: {', '.join(ast_warnings)}"
),
"SecurityError",
execution_time=0,
)
# Check dependencies if provided
dependencies = arguments.get("dependencies", [])
auto_install = arguments.get("auto_install_dependencies", False)
require_confirmation = arguments.get("require_confirmation", True)
if dependencies:
dep_result = self._check_and_install_dependencies(
dependencies, auto_install, require_confirmation
)
if not dep_result["success"]:
if dep_result.get("requires_confirmation"):
return {
"success": False,
"requires_confirmation": True,
"missing_packages": dep_result["missing_packages"],
"packages_to_install": dep_result.get(
"packages_to_install", []
),
"install_command": dep_result["install_command"],
"message": dep_result["message"],
}
else:
return self._format_error_response(
RuntimeError(
dep_result.get("error", dep_result["message"])
),
"DependencyError",
execution_time=0,
)
# Create safe execution environment
safe_globals = self._create_safe_globals(additional_vars)
safe_locals = {}
# Execute with timeout and output capture
start_time = time.time()
def execute_code():
return self._capture_output(exec, code, safe_globals, safe_locals)
try:
result, stdout, stderr = self._execute_with_timeout(
execute_code, timeout
)
execution_time = time.time() - start_time
# Extract result from locals
final_result = safe_locals.get(return_variable, None)
# Count code lines
code_lines = len(code.splitlines())
return self._format_success_response(
final_result,
stdout,
stderr,
execution_time,
code_lines,
ast_warnings,
)
except TimeoutError:
execution_time = time.time() - start_time
return self._format_error_response(
TimeoutError(f"Code execution timed out after {timeout} seconds"),
"TimeoutError",
execution_time=execution_time,
)
except Exception as e:
return self._format_error_response(e, type(e).__name__, execution_time=0)
[docs]
@register_tool("PythonScriptRunner")
class PythonScriptRunner(BasePythonExecutor, BaseTool):
"""Run Python script files in isolated subprocess with resource limits."""
[docs]
def __init__(self, tool_config: Dict[str, Any]):
BasePythonExecutor.__init__(self, tool_config)
BaseTool.__init__(self, tool_config)
[docs]
def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Run Python script file in subprocess with safety limits."""
try:
# Extract parameters
script_path = arguments.get("script_path", "")
if not script_path:
return self._format_error_response(
ValueError("script_path parameter is required"),
"ValueError",
execution_time=0,
)
if not os.path.exists(script_path):
return self._format_error_response(
FileNotFoundError(f"Script file not found: {script_path}"),
"FileNotFoundError",
execution_time=0,
)
script_args = arguments.get("script_args", [])
timeout = arguments.get("timeout", 60)
working_dir = arguments.get("working_directory", os.getcwd())
env_vars = arguments.get("env_vars", {})
# Check dependencies if provided
dependencies = arguments.get("dependencies", [])
auto_install = arguments.get("auto_install_dependencies", False)
require_confirmation = arguments.get("require_confirmation", True)
if dependencies:
dep_result = self._check_and_install_dependencies(
dependencies, auto_install, require_confirmation
)
if not dep_result["success"]:
if dep_result.get("requires_confirmation"):
return {
"success": False,
"requires_confirmation": True,
"missing_packages": dep_result["missing_packages"],
"packages_to_install": dep_result.get(
"packages_to_install", []
),
"install_command": dep_result["install_command"],
"message": dep_result["message"],
}
else:
return self._format_error_response(
RuntimeError(
dep_result.get("error", dep_result["message"])
),
"DependencyError",
execution_time=0,
)
# Create restricted environment
restricted_env = os.environ.copy()
restricted_env.update(env_vars)
# Remove potentially dangerous environment variables
dangerous_vars = ["PYTHONPATH", "PATH"]
for var in dangerous_vars:
if var in restricted_env:
del restricted_env[var]
# Prepare command
cmd = [sys.executable, script_path] + script_args
# Execute in subprocess
start_time = time.time()
try:
result = subprocess.run(
cmd,
cwd=working_dir,
env=restricted_env,
capture_output=True,
text=True,
timeout=timeout,
)
execution_time = time.time() - start_time
if result.returncode == 0:
return self._format_success_response(
f"Script executed successfully "
f"(exit code: {result.returncode})",
result.stdout,
result.stderr,
execution_time,
code_lines=0, # Not easily measurable for external scripts
)
else:
return self._format_error_response(
RuntimeError(
f"Script failed with exit code {result.returncode}"
),
"RuntimeError",
result.stdout,
result.stderr,
execution_time,
)
except subprocess.TimeoutExpired:
execution_time = time.time() - start_time
return self._format_error_response(
TimeoutError(f"Script execution timed out after {timeout} seconds"),
"TimeoutError",
execution_time=execution_time,
)
except Exception as e:
return self._format_error_response(e, type(e).__name__, execution_time=0)