Source code for tooluniverse.compose_scripts.multi_agent_literature_search

"""
Multi-Agent Literature Search Compose Function
==============================================

This module implements the compose function for the multi-agent literature search system.
"""

import json
from datetime import datetime


def _format_papers_for_summary_v2(papers):
    """Format papers for summary generation"""
    if not papers:
        return "No papers found."

    formatted_papers = []
    for i, paper in enumerate(papers[:10], 1):  # Limit to first 10 papers
        title = paper.get("title", "No title")
        abstract = paper.get("abstract", "No abstract available")
        authors = paper.get("authors", [])
        year = paper.get("year", "Unknown year")
        venue = paper.get("venue", paper.get("journal", "Unknown venue"))

        authors_str = ", ".join(authors) if authors else "Unknown authors"

        formatted_paper = f"""
{i}. **{title}**
   Authors: {authors_str}
   Year: {year}
   Venue: {venue}
   Abstract: {abstract[:200]}{'...' if len(abstract) > 200 else ''}
"""
        formatted_papers.append(formatted_paper)

    return "\n".join(formatted_papers)


[docs] def compose(arguments, tooluniverse, call_tool, stream_callback=None): """ Multi-agent literature search compose function Args: arguments (dict): Input parameters from the tool call tooluniverse (ToolUniverse): Reference to the ToolUniverse instance call_tool (function): Function to call other tools stream_callback (callable, optional): Callback function for streaming output Returns: dict: The result of the multi-agent search """ query = arguments.get("query", "") max_iterations = arguments.get("max_iterations", 3) quality_threshold = arguments.get("quality_threshold", 0.7) # 确保工具已加载 print("🔧 确保工具已加载...") try: if hasattr(tooluniverse, "force_full_discovery"): tooluniverse.force_full_discovery() if hasattr(tooluniverse, "load_tools"): tooluniverse.load_tools() print("✅ 工具加载完成") except Exception as e: print(f"⚠️ 工具加载失败: {e}") if not query: return {"success": False, "error": "Query parameter is required"} # Helper function to emit stream events def emit_event(event_type, data=None): if stream_callback: event = { "type": event_type, "timestamp": datetime.now().isoformat(), "data": data or {}, } stream_callback(json.dumps(event) + "\n") print(f"🚀 Starting Multi-Agent Literature Search for: '{query}'") print("=" * 60) # Emit start event emit_event("search_start", {"query": query, "max_iterations": max_iterations}) try: # Step 1: Analyze intent and create search plans print("🤖 Step 1: Analyzing user intent...") emit_event( "agent_start", {"agent": "IntentAnalyzerAgent", "step": "intent_analysis"} ) intent_result = call_tool("IntentAnalyzerAgent", {"user_query": query}) print(f"🔍 IntentAnalyzerAgent raw result: {intent_result}") print(f"🔍 IntentAnalyzerAgent result type: {type(intent_result)}") # Handle both string and dict results if isinstance(intent_result, str): print(f"🔍 Parsing string result: {intent_result[:200]}...") try: intent_result = json.loads(intent_result) print(f"🔍 Parsed successfully: {intent_result}") except json.JSONDecodeError as e: print(f"❌ JSON parse error: {e}") emit_event( "agent_error", {"agent": "IntentAnalyzerAgent", "error": "Invalid JSON response"}, ) return { "success": False, "error": f"Intent analysis failed: Invalid JSON response - {intent_result}", } print(f"🔍 Final intent_result: {intent_result}") if not intent_result.get("success"): print( f"❌ Intent analysis failed: {intent_result.get('error', 'Unknown error')}" ) emit_event( "agent_error", { "agent": "IntentAnalyzerAgent", "error": intent_result.get("error", "Unknown error"), }, ) return { "success": False, "error": f"Intent analysis failed: {intent_result.get('error', 'Unknown error')}", } content = intent_result.get("result", "{}") print(f"🔍 Content to parse: {content}") try: analysis = json.loads(content) print(f"🔍 Parsed analysis: {analysis}") except json.JSONDecodeError as e: print(f"❌ Analysis JSON parse error: {e}") analysis = {} user_intent = analysis.get("user_intent", "") search_plans_data = analysis.get("search_plans", []) print(f"🔍 User intent: '{user_intent}'") print(f"🔍 Search plans data: {search_plans_data}") print(f"✅ User Intent: {user_intent}") print(f"📋 Created {len(search_plans_data)} search plans") emit_event( "agent_complete", { "agent": "IntentAnalyzerAgent", "user_intent": user_intent, "plans_count": len(search_plans_data), }, ) # Initialize search plans search_plans = [] for i, plan_data in enumerate(search_plans_data): plan = { "plan_id": f"plan_{i+1}", "title": plan_data.get("title", ""), "description": plan_data.get("description", ""), "keywords": plan_data.get("keywords", []), "priority": plan_data.get("priority", 1), "status": "pending", "results": [], "summary": "", "quality_score": 0.0, } search_plans.append(plan) # Iterative search process current_iteration = 0 is_complete = False emit_event( "plans_created", { "plans": [ {"id": p["plan_id"], "title": p["title"]} for p in search_plans ] }, ) while not is_complete and current_iteration < max_iterations: current_iteration += 1 print(f"\n🔄 Iteration {current_iteration}") print("-" * 40) emit_event( "iteration_start", {"iteration": current_iteration, "max_iterations": max_iterations}, ) # Step 2: Extract keywords for pending plans pending_plans = [p for p in search_plans if p["status"] == "pending"] for plan in pending_plans: print(f"🔍 Extracting keywords for '{plan['title']}'") emit_event( "agent_start", { "agent": "KeywordExtractorAgent", "plan_id": plan["plan_id"], "plan_title": plan["title"], }, ) keywords_result = call_tool( "KeywordExtractorAgent", { "plan_title": plan["title"], "plan_description": plan["description"], "current_keywords": ", ".join(plan["keywords"]), }, ) # Handle both string and dict results if isinstance(keywords_result, str): try: keywords_result = json.loads(keywords_result) except json.JSONDecodeError: emit_event( "agent_error", { "agent": "KeywordExtractorAgent", "plan_id": plan["plan_id"], "error": "Invalid JSON response", }, ) continue if keywords_result.get("success"): refined_data = json.loads(keywords_result.get("result", "{}")) plan["keywords"] = refined_data.get( "refined_keywords", plan["keywords"] ) print(f"✅ Refined keywords: {plan['keywords']}") emit_event( "agent_complete", { "agent": "KeywordExtractorAgent", "plan_id": plan["plan_id"], "keywords": plan["keywords"], }, ) else: emit_event( "agent_error", { "agent": "KeywordExtractorAgent", "plan_id": plan["plan_id"], "error": "Keyword extraction failed", }, ) # Step 3: Execute parallel searches print("🔎 Executing parallel searches...") emit_event("search_start", {"plans_count": len(pending_plans)}) for plan in pending_plans: plan["status"] = "in_progress" all_results = [] emit_event( "plan_search_start", {"plan_id": plan["plan_id"], "plan_title": plan["title"]}, ) # Search each keyword for keyword in plan["keywords"]: print(f" Searching: {keyword}") # Try multiple literature search tools search_tools = [ "ArXiv_search_papers", "PubMed_search_articles", "EuropePMC_search_articles", "SemanticScholar_search_papers", "openalex_literature_search", ] for tool_name in search_tools: try: emit_event( "tool_search_start", { "tool": tool_name, "keyword": keyword, "plan_id": plan["plan_id"], }, ) search_result = call_tool( tool_name, {"query": keyword, "limit": 2} ) print( f" 🔍 {tool_name} result type: {type(search_result)}" ) print(f" 🔍 {tool_name} result: {search_result}") # Handle different return formats papers = [] if isinstance(search_result, list): # Most literature search tools return arrays directly papers = search_result elif isinstance(search_result, dict): if search_result.get("success"): papers = search_result.get("papers", []) elif search_result.get("results"): papers = search_result.get("results", []) elif "error" in search_result: print( f" ⚠️ {tool_name} returned error: {search_result.get('error')}" ) papers = [] elif isinstance(search_result, str): try: parsed = json.loads(search_result) if isinstance(parsed, list): papers = parsed elif isinstance(parsed, dict): papers = parsed.get( "papers", parsed.get("results", []) ) except json.JSONDecodeError: print( f" ⚠️ {tool_name} returned invalid JSON string" ) papers = [] if papers: all_results.extend(papers[:2]) # Limit per tool # Emit individual paper events for real-time display for paper in papers[:2]: emit_event( "paper", { "paper": paper, "tool": tool_name, "keyword": keyword, "plan_id": plan["plan_id"], }, ) emit_event( "tool_search_complete", { "tool": tool_name, "keyword": keyword, "plan_id": plan["plan_id"], "papers_found": len(papers[:2]), }, ) break # Found results, move to next keyword else: emit_event( "tool_search_complete", { "tool": tool_name, "keyword": keyword, "plan_id": plan["plan_id"], "papers_found": 0, }, ) except Exception as e: print(f" ⚠️ {tool_name} failed: {e}") emit_event( "tool_search_error", { "tool": tool_name, "keyword": keyword, "plan_id": plan["plan_id"], "error": str(e), }, ) continue plan["results"] = all_results plan["status"] = "completed" print(f"✅ Found {len(all_results)} results for '{plan['title']}'") emit_event( "plan_search_complete", { "plan_id": plan["plan_id"], "plan_title": plan["title"], "papers_found": len(all_results), }, ) # Step 4: Summarize results print("📝 Summarizing results...") emit_event( "summarization_start", { "plans_count": len( [ p for p in search_plans if p["status"] == "completed" and not p["summary"] ] ) }, ) for plan in search_plans: if plan["status"] == "completed" and not plan["summary"]: papers_text = _format_papers_for_summary_v2(plan["results"]) emit_event( "agent_start", { "agent": "ResultSummarizerAgent", "plan_id": plan["plan_id"], "plan_title": plan["title"], }, ) summary_result = call_tool( "ResultSummarizerAgent", { "plan_title": plan["title"], "plan_description": plan["description"], "paper_count": str(len(plan["results"])), "papers_text": papers_text, }, ) # Handle both string and dict results if isinstance(summary_result, str): # ResultSummarizerAgent returns string directly when return_json=false plan["summary"] = summary_result print(f"✅ Generated summary for '{plan['title']}'") emit_event( "agent_complete", { "agent": "ResultSummarizerAgent", "plan_id": plan["plan_id"], "summary_length": len(plan["summary"]), }, ) elif isinstance(summary_result, dict): if summary_result.get("success"): plan["summary"] = summary_result.get( "result", "Summary generation failed." ) print(f"✅ Generated summary for '{plan['title']}'") emit_event( "agent_complete", { "agent": "ResultSummarizerAgent", "plan_id": plan["plan_id"], "summary_length": len(plan["summary"]), }, ) else: emit_event( "agent_error", { "agent": "ResultSummarizerAgent", "plan_id": plan["plan_id"], "error": "Summary generation failed", }, ) else: emit_event( "agent_error", { "agent": "ResultSummarizerAgent", "plan_id": plan["plan_id"], "error": "Unexpected result type", }, ) # Step 5: Check quality and decide next steps print("🔍 Checking result quality...") emit_event("quality_check_start", {"iteration": current_iteration}) # Calculate quality scores for plan in search_plans: plan["quality_score"] = _calculate_quality_score(plan) avg_quality = ( sum(plan["quality_score"] for plan in search_plans) / len(search_plans) if search_plans else 0.0 ) print(f"📊 Average quality score: {avg_quality:.2f}") emit_event( "quality_check_complete", { "avg_quality": avg_quality, "quality_threshold": quality_threshold, "iteration": current_iteration, }, ) if avg_quality >= quality_threshold: is_complete = True print("✅ Quality threshold met. Search complete!") emit_event( "search_complete", {"reason": "quality_threshold_met", "avg_quality": avg_quality}, ) else: if current_iteration < max_iterations: print("🔄 Quality below threshold. Planning next iteration...") emit_event( "agent_start", {"agent": "QualityCheckerAgent", "step": "quality_improvement"}, ) # Get improvement suggestions plans_analysis = _format_plans_for_analysis(search_plans) improvement_result = call_tool( "QualityCheckerAgent", {"plans_analysis": plans_analysis} ) # Handle both string and dict results if isinstance(improvement_result, str): try: improvement_result = json.loads(improvement_result) except json.JSONDecodeError: emit_event( "agent_error", { "agent": "QualityCheckerAgent", "error": "Invalid JSON response", }, ) continue if improvement_result.get("success"): improvement_data = json.loads( improvement_result.get("result", "{}") ) _apply_improvements(search_plans, improvement_data) emit_event( "agent_complete", { "agent": "QualityCheckerAgent", "improvements_applied": len( improvement_data.get("improvements", []) ), "new_plans": len(improvement_data.get("new_plans", [])), }, ) else: emit_event( "agent_error", { "agent": "QualityCheckerAgent", "error": "Quality improvement failed", }, ) else: print( "⚠️ Max iterations reached. Search complete with current results." ) is_complete = True emit_event( "search_complete", { "reason": "max_iterations_reached", "avg_quality": avg_quality, }, ) # Step 6: Generate overall summary print("\n📊 Generating overall summary...") emit_event( "overall_summary_start", {"total_papers": sum(len(plan["results"]) for plan in search_plans)}, ) plan_summaries = _format_plan_summaries(search_plans) total_papers = sum(len(plan["results"]) for plan in search_plans) emit_event( "agent_start", {"agent": "OverallSummaryAgent", "step": "overall_summary"} ) overall_summary_result = call_tool( "OverallSummaryAgent", { "user_query": query, "user_intent": user_intent, "total_papers": str(total_papers), "total_plans": str(len(search_plans)), "iterations": str(current_iteration), "plan_summaries": plan_summaries, }, ) # Handle both string and dict results if isinstance(overall_summary_result, str): try: overall_summary_result = json.loads(overall_summary_result) except json.JSONDecodeError: emit_event( "agent_error", {"agent": "OverallSummaryAgent", "error": "Invalid JSON response"}, ) overall_summary_result = { "success": False, "error": "Invalid JSON response", } overall_summary = "" if overall_summary_result.get("success"): overall_summary = overall_summary_result.get( "result", "Overall summary generation failed." ) emit_event( "agent_complete", { "agent": "OverallSummaryAgent", "summary_length": len(overall_summary), }, ) else: emit_event( "agent_error", { "agent": "OverallSummaryAgent", "error": "Overall summary generation failed", }, ) print("\n" + "=" * 60) print("🎉 Multi-Agent Search Complete!") print(f"📊 Total Papers Found: {total_papers}") print(f"📋 Plans Executed: {len(search_plans)}") print(f"🔄 Iterations: {current_iteration}") emit_event( "search_final_complete", { "total_papers": total_papers, "total_plans": len(search_plans), "iterations": current_iteration, "avg_quality": avg_quality, }, ) # Format final results all_papers = [] for plan in search_plans: all_papers.extend(plan["results"]) return { "success": True, "results": { "papers": all_papers, "total_papers": total_papers, "plan_summaries": _create_plan_summaries(search_plans), "overall_summary": overall_summary, "search_metadata": { "user_intent": user_intent, "total_plans": len(search_plans), "iterations": current_iteration, "is_complete": is_complete, "avg_quality_score": avg_quality, }, }, } except Exception as e: return {"success": False, "error": f"Multi-agent search failed: {str(e)}"}
def _calculate_quality_score(plan): """Calculate quality score for a search plan""" if not plan["results"]: return 0.0 result_count = len(plan["results"]) has_recent_papers = any(paper.get("year", 0) >= 2020 for paper in plan["results"]) has_abstracts = any(paper.get("abstract") for paper in plan["results"]) score = 0.0 # Result count score (0-0.4) if result_count >= 10: score += 0.4 elif result_count >= 5: score += 0.3 elif result_count >= 2: score += 0.2 else: score += 0.1 # Recent papers score (0-0.3) if has_recent_papers: score += 0.3 # Abstract availability score (0-0.3) if has_abstracts: score += 0.3 return min(score, 1.0) def _format_plans_for_analysis(plans): """Format plans for quality analysis""" formatted = [] for plan in plans: formatted.append( f""" Plan: {plan['title']} Quality Score: {plan['quality_score']:.2f} Results Count: {len(plan['results'])} Status: {plan['status']} """ ) return "\n".join(formatted) def _apply_improvements(plans, improvement_data): """Apply improvements to existing plans and add new plans""" # Apply improvements to existing plans for improvement in improvement_data.get("improvements", []): plan_id = improvement.get("plan_id") suggestions = improvement.get("suggestions", []) for plan in plans: if plan["plan_id"] == plan_id: # Add suggestions as new keywords plan["keywords"].extend(suggestions) plan["status"] = "pending" # Reset status for re-search break # Add new plans for new_plan_data in improvement_data.get("new_plans", []): new_plan = { "plan_id": f"plan_{len(plans) + 1}", "title": new_plan_data.get("title", ""), "description": new_plan_data.get("description", ""), "keywords": new_plan_data.get("keywords", []), "priority": new_plan_data.get("priority", 1), "status": "pending", "results": [], "summary": "", "quality_score": 0.0, } plans.append(new_plan) def _format_plan_summaries(plans): """Format plan summaries for overall summary generation""" summaries = [] for plan in plans: if plan["summary"]: summaries.append( f""" Plan: {plan['title']} Description: {plan['description']} Quality Score: {plan['quality_score']:.2f} Results: {len(plan['results'])} papers Summary: {plan['summary']} """ ) return "\n".join(summaries) def _create_plan_summaries(plans): """Create structured plan summaries for return""" plan_summaries = [] for plan in plans: if plan["summary"]: plan_summaries.append( { "plan_title": plan["title"], "plan_description": plan["description"], "quality_score": plan["quality_score"], "results_count": len(plan["results"]), "summary": plan["summary"], } ) return plan_summaries