Source code for tooluniverse.medlineplus_tool
# medlineplus_tool.py
import requests
import xmltodict
from typing import Optional, Dict, Any
import re
import json
from .base_tool import BaseTool
from .tool_registry import register_tool
[docs]
@register_tool("MedlinePlusRESTTool")
class MedlinePlusRESTTool(BaseTool):
"""
MedlinePlus REST API tool class.
Supports health topic search, code lookup, genetics information retrieval, etc.
"""
[docs]
def __init__(self, tool_config):
super().__init__(tool_config)
self.timeout = 10
self.endpoint_template = tool_config["fields"]["endpoint"]
self.param_schema = tool_config["parameter"]["properties"]
[docs]
def _build_url(self, arguments: dict) -> str:
"""Build complete URL"""
url_path = self.endpoint_template
placeholders = re.findall(r"\{([^{}]+)\}", url_path)
for ph in placeholders:
if ph not in arguments:
return {"error": f"Missing required parameter '{ph}'"}
url_path = url_path.replace(f"{{{ph}}}", str(arguments[ph]))
return url_path
[docs]
def _extract_text_content(self, text_item: dict) -> str:
"""Extract content from text item"""
if not isinstance(text_item, dict):
return ""
text = text_item.get("text", {})
if not isinstance(text, dict):
return ""
html = text.get("html", "")
if isinstance(html, dict) and "html:p" in html:
paragraphs = html["html:p"]
if isinstance(paragraphs, list):
return "\n".join(
[
p.get("#text", "")
for p in paragraphs
if isinstance(p, dict) and "#text" in p
]
)
return html.replace("<p>", "").replace("</p>", "\n")
[docs]
def _format_response(self, response: Any, tool_name: str) -> Dict[str, Any]:
"""Format response content"""
if not isinstance(response, dict):
return {"raw_response": response}
# Extract text content
def get_text_content(data, role):
text_list = data.get("text-list", [])
if isinstance(text_list, dict):
text_list = [text_list]
for item in text_list:
if isinstance(item, dict) and "text" in item:
text = item["text"]
if text.get("text-role") == role:
return self._extract_text_content(item)
return ""
# Extract list items
def get_list_items(
data, list_key, item_key, name_key="name", url_key="ghr-page"
):
items = []
list_data = data.get(list_key, {})
if isinstance(list_data, dict):
items = list_data.get(item_key, [])
if not isinstance(items, list):
items = [items]
for item in items:
if isinstance(item, dict):
name = item.get(name_key, "")
url = item.get(url_key, "")
items.append(f"{name} ({url})" if url else name)
return items
# Format response based on tool type
if tool_name == "MedlinePlus_search_topics_by_keyword":
# First print raw response for debugging
print("\n🔍 Raw response structure:")
print(
json.dumps(response, indent=2, ensure_ascii=False)[:2000] + "..."
if len(json.dumps(response, indent=2, ensure_ascii=False)) > 2000
else json.dumps(response, indent=2, ensure_ascii=False)
)
# Extract topic information from XML structure
nlm_result = response.get("nlmSearchResult", {})
if not nlm_result:
return {"error": "nlmSearchResult node not found"}
# Get document list
document_list = nlm_result.get("list", {}).get("document", [])
if not document_list:
return {"error": "document list not found"}
# Ensure document_list is a list
if isinstance(document_list, dict):
document_list = [document_list]
formatted_topics = []
for doc in document_list:
# Get document basic info
doc_url = doc.get("@url", "")
doc_rank = doc.get("@rank", "")
# Get content node
content = doc.get("content", {})
if isinstance(content, dict):
health_topic = content.get("health-topic", {})
if health_topic:
# Extract health topic information
title = health_topic.get("@title", "")
meta_desc = health_topic.get("@meta-desc", "")
topic_url = health_topic.get("@url", doc_url)
language = health_topic.get("@language", "")
# Extract aliases
also_called = health_topic.get("also-called", [])
if isinstance(also_called, str):
also_called = [also_called]
elif isinstance(also_called, dict):
also_called = [also_called.get("#text", str(also_called))]
elif not isinstance(also_called, list):
also_called = []
# Extract summary
full_summary = health_topic.get("full-summary", "")
if isinstance(full_summary, dict):
full_summary = str(full_summary)
# Extract group information
groups = health_topic.get("group", [])
if isinstance(groups, str):
groups = [groups]
elif isinstance(groups, dict):
groups = [groups.get("#text", str(groups))]
elif not isinstance(groups, list):
groups = []
formatted_topics.append(
{
"title": title,
"meta_desc": meta_desc,
"url": topic_url,
"language": language,
"rank": doc_rank,
"also_called": also_called,
"summary": (
full_summary[:500] + "..."
if len(str(full_summary)) > 500
else full_summary
),
"groups": groups,
}
)
return (
{"topics": formatted_topics}
if formatted_topics
else {"error": "Failed to parse health topic information"}
)
elif tool_name == "MedlinePlus_get_genetics_condition_by_name":
return {
"name": response.get("name", ""),
"description": get_text_content(response, "description"),
"genes": get_list_items(
response, "related-gene-list", "related-gene", "gene-symbol"
),
"synonyms": [
s.get("synonym", "") for s in response.get("synonym-list", [])
],
"ghr_page": response.get("ghr_page", ""),
}
elif tool_name == "MedlinePlus_get_genetics_gene_by_name":
gene_summary = response.get("gene-summary", {})
return {
"name": gene_summary.get("name", ""),
"function": get_text_content(gene_summary, "function"),
"health_conditions": get_list_items(
gene_summary,
"related-health-condition-list",
"related-health-condition",
),
"synonyms": gene_summary.get("synonym-list", {}).get("synonym", []),
"ghr_page": gene_summary.get("ghr-page", ""),
}
elif tool_name == "MedlinePlus_connect_lookup_by_code":
responses = response.get("knowledgeResponse", [])
return (
{
"responses": [
{
"title": r.get("title", ""),
"summary": r.get("summary", ""),
"url": r.get("url", ""),
}
for r in responses
]
}
if responses
else {"error": "No matching code information found"}
)
elif tool_name == "MedlinePlus_get_genetics_index":
topics = response.get("genetics_home_reference_topic_list", {}).get(
"topic", []
)
return (
{
"topics": [
{"name": t.get("name", ""), "url": t.get("url", "")}
for t in topics
]
}
if topics
else {"error": "No genetics topics found"}
)
return {"raw_response": response}
[docs]
def run(self, arguments: dict):
"""Execute tool call"""
# Validate required parameters
for key, prop in self.param_schema.items():
if prop.get("required", False) and key not in arguments:
return {"error": f"Parameter '{key}' is required."}
# Build URL
url = self._build_url(arguments)
if isinstance(url, dict) and "error" in url:
return url
# Print complete URL
print(f"\n🔗 Request URL: {url}")
# Make request
try:
resp = requests.get(url, timeout=self.timeout)
if resp.status_code != 200:
return {
"error": f"MedlinePlus returned non-200 status code: {resp.status_code}",
"detail": resp.text,
}
print(f"\n📊 Response status: {resp.status_code}")
print(f"📏 Response length: {len(resp.text)} characters")
print(f"🔤 First 500 characters of response: {resp.text[:500]}...")
# Improved parsing logic
tool_name = self.tool_config["name"]
response_text = resp.text.strip()
# Decide parsing method based on tool type and content format
if url.endswith(".json") or (arguments.get("format") == "json"):
# JSON format
response = resp.json()
print("📋 Parsed as: JSON")
elif (
url.endswith(".xml")
or response_text.startswith("<?xml")
or (arguments.get("format") == "xml")
):
# XML format
response = xmltodict.parse(resp.text)
print("📋 Parsed as: XML -> Dictionary")
elif tool_name == "MedlinePlus_search_topics_by_keyword":
# Search tool defaults to XML
response = xmltodict.parse(resp.text)
print("📋 Parsed as: XML -> Dictionary (Search tool)")
elif tool_name == "MedlinePlus_get_genetics_index":
# Genetics index defaults to XML
response = xmltodict.parse(resp.text)
print("📋 Parsed as: XML -> Dictionary (Genetics index)")
else:
# Other cases keep original text
response = resp.text
print("📋 Parsed as: Plain text")
print(f"🔍 Parsed data type: {type(response)}")
if isinstance(response, dict):
print(f"🗝️ Top-level dictionary keys: {list(response.keys())}")
return self._format_response(response, tool_name)
except requests.RequestException as e:
return {"error": f"Failed to request MedlinePlus: {str(e)}"}
# Tool methods
[docs]
def search_topics_by_keyword(
self, term: str, db: str, rettype: str = "brief"
) -> Dict[str, Any]:
return self.run({"term": term, "db": db, "rettype": rettype})
[docs]
def connect_lookup_by_code(
self,
cs: str,
c: str,
dn: Optional[str] = None,
language: str = "en",
format: str = "json",
) -> Any:
args = {"cs": cs, "c": c, "language": language, "format": format}
if dn:
args["dn"] = dn
return self.run(args)
[docs]
def get_genetics_condition_by_name(
self, condition: str, format: str = "json"
) -> Any:
return self.run({"condition": condition, "format": format})
[docs]
def get_genetics_gene_by_name(self, gene: str, format: str = "json") -> Any:
return self.run({"gene": gene, "format": "json"})