import json
import os
def _discover_similar_tools(tool_description, call_tool):
"""Discover similar tools"""
similar_tools = []
discovery_methods = [
("Tool_Finder_Keyword", {"description": tool_description, "limit": 5})
]
for method_name, args in discovery_methods:
result = call_tool(method_name, args)
if result and isinstance(result, list):
similar_tools.extend(result)
# Deduplicate
seen = set()
deduped_tools = []
for tool in similar_tools:
try:
if isinstance(tool, dict):
tool_tuple = tuple(sorted(tool.items()))
elif isinstance(tool, (list, tuple)):
deduped_tools.append(tool)
continue
else:
tool_tuple = tool
if tool_tuple not in seen:
seen.add(tool_tuple)
deduped_tools.append(tool)
except (TypeError, AttributeError):
deduped_tools.append(tool)
return deduped_tools
def _generate_tool_specification(tool_description, similar_tools, call_tool):
"""Generate tool specification"""
spec_input = {
"tool_description": tool_description,
"tool_category": "general",
"tool_type": "CustomTool",
"similar_tools": json.dumps(similar_tools) if similar_tools else "[]",
"existing_tools_summary": "Available tools: standard ToolUniverse tools",
}
result = call_tool("ToolSpecificationGenerator", spec_input)
if not result or "result" not in result:
raise RuntimeError("ToolSpecificationGenerator returned invalid result")
tool_config = result["result"]
# Ensure tool_config is a dictionary
if isinstance(tool_config, str):
try:
tool_config = json.loads(tool_config)
except json.JSONDecodeError:
raise ValueError(f"Failed to parse tool_config JSON: {tool_config}")
elif not isinstance(tool_config, dict):
raise TypeError(
f"tool_config must be a dictionary, " f"got: {type(tool_config)}"
)
return tool_config
def _generate_implementation(tool_config, call_tool):
"""Generate implementation code for all tool types"""
if "implementation" in tool_config:
return tool_config
impl_input = {
"tool_description": tool_config.get("description", ""),
"tool_parameters": json.dumps(tool_config.get("parameter", {})),
"domain": "general",
"complexity_level": "intermediate",
}
# Try multiple times to generate implementation
for attempt in range(3):
try:
print(
f"🔄 Attempting to generate implementation code "
f"(attempt {attempt + 1}/3)..."
)
result = call_tool("ToolImplementationGenerator", impl_input)
if result and "result" in result:
result_data = result["result"]
if isinstance(result_data, str):
try:
impl_data = json.loads(result_data)
except json.JSONDecodeError as e:
print(f"⚠️ JSON parsing failed: {e}")
continue
else:
impl_data = result_data
if (
"implementation" in impl_data
and "source_code" in impl_data["implementation"]
):
tool_config["implementation"] = impl_data["implementation"]
print("✅ Successfully generated implementation code")
return tool_config
else:
missing_fields = list(impl_data.get("implementation", {}).keys())
print(
f"⚠️ Generated implementation missing required "
f"fields: {missing_fields}"
)
else:
print("⚠️ ToolImplementationGenerator returned invalid result")
except Exception as e:
print(
f"❌ Error generating implementation code "
f"(attempt {attempt + 1}/3): {e}"
)
continue
return tool_config
def _generate_test_cases(tool_config, call_tool):
"""Generate test cases"""
test_input = {"tool_config": tool_config}
for attempt in range(5):
try:
result = call_tool("TestCaseGenerator", test_input)
if result and "result" in result:
result_data = result["result"]
if isinstance(result_data, str):
test_data = json.loads(result_data)
else:
test_data = result_data
if "test_cases" in test_data:
test_cases = test_data["test_cases"]
if _validate_test_cases(test_cases, tool_config):
return test_cases
except Exception as e:
print(f"🔧 TestCaseGenerator attempt #{attempt + 1}/5 failed: {e}")
continue
return []
def _validate_test_cases(test_cases, tool_config):
"""Validate test cases"""
if not isinstance(test_cases, list):
return False
tool_name = tool_config.get("name", "")
required_params = tool_config.get("parameter", {}).get("required", [])
for test_case in test_cases:
if not isinstance(test_case, dict):
return False
if test_case.get("name") != tool_name:
return False
args = test_case.get("arguments", {})
missing_params = [p for p in required_params if p not in args]
if missing_params:
return False
return True
def _execute_test_cases(tool_config, test_cases):
"""Execute test cases to validate code functionality"""
print("🧪 Executing test cases to validate code functionality...")
test_results = {
"total_tests": len(test_cases),
"passed_tests": 0,
"failed_tests": 0,
"test_details": [],
"overall_success_rate": 0.0,
}
if not test_cases:
print("⚠️ No test cases to execute")
return test_results
# Dynamic import of generated tool code
# try:
# Build tool code file path
tool_name = tool_config.get("name", "UnknownTool")
base_filename = f"generated_tool_{tool_config['name']}"
code_file = f"generated_tool_{tool_name.lower()}_code.py"
print("💾 Saving tool files for testing...")
saved_files = _save_tool_files(tool_config, base_filename)
print(f"Saved: {saved_files}")
if os.path.exists(code_file):
# 动态导入工具
import importlib.util
spec = importlib.util.spec_from_file_location(tool_name, code_file)
tool_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(tool_module)
# Get tool function
tool_function = getattr(tool_module, tool_name.lower(), None)
if tool_function:
print(f"✅ Successfully imported tool: {tool_name}")
# Execute each test case
for i, test_case in enumerate(test_cases):
test_result = {
"test_id": i + 1,
"test_case": test_case,
"status": "unknown",
"result": None,
"error": None,
"execution_time": 0,
}
try:
import time
start_time = time.time()
# Extract test parameters
if isinstance(test_case, dict) and "input" in test_case:
test_args = test_case["input"]
elif isinstance(test_case, dict) and "arguments" in test_case:
test_args = test_case["arguments"]
else:
test_args = test_case
# Execute test
result = tool_function(test_args)
print(f"result: {result}")
execution_time = time.time() - start_time
# Validate result
if result is not None and not isinstance(result, dict):
test_result["status"] = "failed"
test_result["error"] = "Return value is not a dictionary"
elif result is None:
test_result["status"] = "failed"
test_result["error"] = "Return value is None"
else:
test_result["status"] = "passed"
test_result["result"] = result
test_result["execution_time"] = execution_time
except Exception as e:
test_result["status"] = "failed"
test_result["error"] = str(e)
test_result["execution_time"] = 0
# Count results
if test_result["status"] == "passed":
test_results["passed_tests"] += 1
else:
test_results["failed_tests"] += 1
test_results["test_details"].append(test_result)
# Print test results
status_emoji = "✅" if test_result["status"] == "passed" else "❌"
print(f" {status_emoji} Test {i+1}: {test_result['status']}")
if test_result["error"]:
print(f" Error: {test_result['error']}")
# Calculate success rate
test_results["overall_success_rate"] = (
(test_results["passed_tests"] / test_results["total_tests"])
if test_results["total_tests"] > 0
else 0.0
)
passed = test_results["passed_tests"]
total = test_results["total_tests"]
print(f"📊 Test execution completed: {passed}/{total} passed")
print(f"🎯 Success rate: {test_results['overall_success_rate']:.1%}")
else:
print(f"❌ Unable to find tool function: {tool_name.lower()}")
test_results["error"] = f"Tool function not found: {tool_name.lower()}"
else:
print(f"❌ Tool code file does not exist: {code_file}")
test_results["error"] = f"Code file does not exist: {code_file}"
return test_results
def _evaluate_quality(tool_config, test_cases, call_tool):
"""评估代码质量 - 使用增强的CodeQualityAnalyzer + 实际测试执行"""
# 首先执行测试样例来验证功能
test_execution_results = _execute_test_cases(tool_config, test_cases)
# 提取实现代码
implementation_code = ""
if "implementation" in tool_config:
impl = tool_config["implementation"]
print("impl.keys():", impl.keys())
implementation_code = impl["source_code"]
# Build analysis input including test execution results
eval_input = {
"tool_name": tool_config.get("name", "UnknownTool"),
"tool_description": tool_config.get("description", ""),
"tool_parameters": json.dumps(tool_config.get("parameter", {})),
"implementation_code": implementation_code,
"test_cases": json.dumps(test_cases),
"test_execution_results": json.dumps(test_execution_results),
}
print("🔍 Using CodeQualityAnalyzer for deep code quality analysis...")
result = call_tool("CodeQualityAnalyzer", eval_input)
print(f"result: {result['result']}")
result_data = result["result"]
parsed_data = json.loads(result_data)
parsed_data["test_execution"] = test_execution_results
return parsed_data
def _expand_test_coverage(tool_config, call_tool):
"""Expand test coverage"""
test_input = {
"tool_config": tool_config,
"focus_areas": ["edge_cases", "boundary_conditions", "error_scenarios"],
}
result = call_tool("TestCaseGenerator", test_input)
if result and "result" in result:
result_data = result["result"]
if isinstance(result_data, str):
try:
test_cases = json.loads(result_data)
if "test_cases" in test_cases:
if "testing" not in tool_config:
tool_config["testing"] = {}
tool_config["testing"]["test_cases"] = test_cases["test_cases"]
return tool_config
except json.JSONDecodeError:
pass
return None
def _optimize_code(tool_config, call_tool, quality_evaluation):
"""General code optimization"""
optimization_input = {
"tool_config": json.dumps(tool_config),
"quality_evaluation": json.dumps(quality_evaluation),
}
result = call_tool("CodeOptimizer", optimization_input)
if result and "result" in result:
result_data = result["result"]
optimized = json.loads(result_data)
# Check return format, CodeOptimizer now returns {"implementation": {...}}
if "implementation" in optimized:
tool_config["implementation"] = optimized["implementation"]
else:
# Compatible with old format
tool_config["implementation"] = optimized
return tool_config
[docs]
def iterative_code_improvement(
tool_config, call_tool, max_iterations=5, target_score=9.5
):
"""Iteratively improve code implementation until target quality score is reached"""
print("\n🚀 Starting iterative code improvement process")
print(f"Target quality score: {target_score}/10")
print(f"Maximum iterations: {max_iterations}")
current_score = 0
improvement_history = []
for iteration in range(max_iterations):
print(f"\n🔄 Iteration {iteration + 1}/{max_iterations}")
print(f"Current quality score: {current_score:.2f}/10")
# Generate test cases and evaluate quality
test_cases = _generate_test_cases(tool_config, call_tool)
print(f"Generated {len(test_cases)} test cases")
print(f"test_cases: {test_cases}")
quality_evaluation = _evaluate_quality(tool_config, test_cases, call_tool)
new_score = quality_evaluation.get("overall_score", 0)
print(f"Quality evaluation result: {new_score:.2f}/10")
if "scores" in quality_evaluation:
for aspect, score in quality_evaluation["scores"].items():
print(f" - {aspect}: {score:.2f}/10")
# Check if target is reached
if new_score >= target_score:
print(f"🎉 Target quality score {target_score}/10 reached!")
improvement_history.append(
{
"iteration": iteration + 1,
"score": new_score,
"improvement": new_score - current_score,
"status": "target_achieved",
}
)
break
# Record improvement
improvement = new_score - current_score
print(f"Improvement: {improvement:+.2f}")
improvement_history.append(
{
"iteration": iteration + 1,
"score": new_score,
"improvement": improvement,
"status": "improved",
}
)
current_score = new_score
tool_config = _optimize_code(tool_config, call_tool, quality_evaluation)
# Final quality evaluation
final_test_cases = _generate_test_cases(tool_config, call_tool)
final_quality = _evaluate_quality(tool_config, final_test_cases, call_tool)
final_score = final_quality.get("overall_score", current_score)
print("🏁 Iterative improvement completed")
print(f"Final quality score: {final_score:.2f}/10")
print(f"Total iterations: {len(improvement_history)}")
print("\n📈 Improvement history:")
for record in improvement_history:
status_emoji = "🎯" if record["status"] == "target_achieved" else "📈"
print(
f" {status_emoji} Round {record['iteration']}: {record['score']:.2f}/10 (improvement: {record['score']:+.2f})"
)
return tool_config, final_score, improvement_history
def _save_tool_files(tool_config, base_filename):
"""Save tool files"""
# Update configuration
config_to_save = tool_config.copy()
class_name = config_to_save.get("name", "CustomTool")
config_to_save["type"] = class_name
# Extract dependency information
dependencies = []
if (
"implementation" in tool_config
and "dependencies" in tool_config["implementation"]
):
dependencies = tool_config["implementation"]["dependencies"]
# Add dependencies field to configuration
config_to_save["dependencies"] = dependencies
# Remove implementation code
if "implementation" in config_to_save:
del config_to_save["implementation"]
# Save configuration file
config_file = f"{base_filename}_config.json"
with open(config_file, "w", encoding="utf-8") as f:
json.dump(config_to_save, f, indent=2, ensure_ascii=False)
# Generate code file
code_file = f"{base_filename}_code.py"
_generate_tool_code(tool_config, code_file)
return [config_file, code_file]
def _convert_json_to_python(obj):
"""Recursively convert JSON object booleans and types to Python format"""
if isinstance(obj, dict):
result = {}
for key, value in obj.items():
result[key] = _convert_json_to_python(value)
return result
elif isinstance(obj, list):
return [_convert_json_to_python(item) for item in obj]
elif obj == "true":
return True
elif obj == "false":
return False
elif obj == "string":
return str
elif obj == "number":
return float
elif obj == "integer":
return int
elif obj == "object":
return dict
elif obj == "array":
return list
else:
return obj
def _convert_python_types_to_strings(obj):
"""Recursively convert Python type objects to string representations for JSON serialization"""
if isinstance(obj, dict):
result = {}
for key, value in obj.items():
result[key] = _convert_python_types_to_strings(value)
return result
elif isinstance(obj, list):
return [_convert_python_types_to_strings(item) for item in obj]
elif obj is True:
return "True"
elif obj is False:
return "False"
elif obj is str:
return "str"
elif obj is float:
return "float"
elif obj is int:
return "int"
elif obj is dict:
return "dict"
elif obj is list:
return "list"
else:
return obj
def _generate_tool_code(tool_config, code_file):
"""Generate Python code for all tool types using correct register_tool method"""
tool_name = tool_config["name"]
with open(code_file, "w", encoding="utf-8") as f:
# Add dependency instructions comment
if (
"implementation" in tool_config
and "dependencies" in tool_config["implementation"]
):
dependencies = tool_config["implementation"]["dependencies"]
if dependencies:
f.write("# Required packages:\n")
for dep in dependencies:
f.write(f"# pip install {dep}\n")
f.write("\n")
f.write("from typing import Dict, Any\n")
f.write("from src.tooluniverse import register_tool\n\n")
# Import dependencies
if (
"implementation" in tool_config
and "imports" in tool_config["implementation"]
):
for imp in tool_config["implementation"]["imports"]:
f.write(f"{imp}\n")
f.write("\n")
# Generate function implementation directly, no classes
f.write("@register_tool(\n")
f.write(f' "{tool_name}",\n')
f.write(" {\n")
f.write(f' "name": "{tool_name}",\n')
f.write(f' "type": "{tool_name}",\n')
f.write(f' "description": "{tool_config.get("description", "")}",\n')
# Use helper functions to convert JSON booleans and types to Python format
parameter_json = _convert_json_to_python(tool_config.get("parameter", {}))
# Convert Python type objects to string representations
parameter_json_str = _convert_python_types_to_strings(parameter_json)
f.write(f' "parameter": {json.dumps(parameter_json_str, indent=8)},\n')
return_schema_json = _convert_json_to_python(
tool_config.get("return_schema", {})
)
# Convert Python type objects to string representations
return_schema_json_str = _convert_python_types_to_strings(return_schema_json)
f.write(
f' "return_schema": {json.dumps(return_schema_json_str, indent=8)},\n'
)
# Add dependency information
if (
"implementation" in tool_config
and "dependencies" in tool_config["implementation"]
):
dependencies = tool_config["implementation"]["dependencies"]
f.write(f' "dependencies": {json.dumps(dependencies, indent=8)}\n')
else:
f.write(' "dependencies": []\n')
f.write(" }\n")
f.write(")\n")
f.write(
f"def {tool_name.lower()}(arguments: Dict[str, Any]) -> Dict[str, Any]:\n"
)
f.write(f' """{tool_config.get("description", "")}"""\n')
f.write(" try:\n")
# Add source code
if (
"implementation" in tool_config
and "source_code" in tool_config["implementation"]
):
source_code = tool_config["implementation"]["source_code"]
f.write(" # Generated implementation:\n")
for line in source_code.split("\n"):
if line.strip(): # Skip empty lines
f.write(f" {line}\n")
else:
f.write("\n")
# Ensure execute_tool is called and result is returned
f.write(" \n")
f.write(" # Execute the tool and return result\n")
f.write(" return execute_tool(arguments)\n")
else:
# Default implementation
f.write(" result = {\n")
f.write(' "status": "success",\n')
f.write(' "message": "Tool executed successfully",\n')
f.write(' "input": arguments\n')
f.write(" }\n")
f.write(" return result\n")
f.write(" except Exception as e:\n")
f.write(' return {"error": str(e)}\n')
[docs]
def compose(arguments, tooluniverse, call_tool):
"""General tool discovery and generation system"""
tool_description = arguments["tool_description"]
max_iterations = arguments.get("max_iterations", 2)
arguments.get("save_to_file", True)
print(f"🔍 Starting tool discovery: {tool_description}")
# 1. Discover similar tools
print("📊 Discovering similar tools...")
similar_tools = _discover_similar_tools(tool_description, call_tool)
print(f"Found {len(similar_tools)} similar tools")
# 2. Generate initial tool specification
print("🏗️ Generating tool specification...")
tool_config = _generate_tool_specification(
tool_description, similar_tools, call_tool
)
# 3. Generate implementation for all tools
print("💻 Generating code implementation...")
tool_config = _generate_implementation(tool_config, call_tool)
# 4. Iterative optimization
print("\n🚀 Starting enhanced iterative improvement system...")
target_quality_score = arguments.get("target_quality_score", 8.5)
print(
f"🎯 Enabling iterative improvement, target quality score: {target_quality_score}/10"
)
tool_config, final_quality_score, improvement_history = iterative_code_improvement(
tool_config,
call_tool,
max_iterations=max_iterations,
target_score=target_quality_score,
)
print(
f"🎉 Iterative improvement completed! Final quality score: {final_quality_score:.2f}/10"
)
# 5. Save tool files
print("💾 Saving tool files...")
base_filename = f"generated_tool_{tool_config['name']}"
saved_files = _save_tool_files(tool_config, base_filename)
print(f"Saved: {saved_files}")
print("\n🎉 Tool generation completed!")
print(f"Tool name: {tool_config['name']}")
print(f"Tool type: {tool_config['type']}")
print(f"Final quality: {final_quality_score:.1f}/10")
return {
"tool_config": tool_config,
"quality_score": final_quality_score,
"saved_files": saved_files,
}