Source code for tooluniverse.dailymed_tool
# dailymed_tool.py
import requests
from typing import Dict, Any, List
from .base_tool import BaseTool
from .tool_registry import register_tool
try:
from lxml import etree
LXML_AVAILABLE = True
except ImportError:
LXML_AVAILABLE = False
DAILYMED_BASE = "https://dailymed.nlm.nih.gov/dailymed/services/v2"
[docs]
@register_tool("SearchSPLTool")
class SearchSPLTool(BaseTool):
"""
Search SPL list based on multiple filter conditions (drug_name/ndc/rxcui/setid/published_date).
Returns original DailyMed API JSON (including metadata + data array).
"""
[docs]
def __init__(self, tool_config):
super().__init__(tool_config)
self.endpoint = f"{DAILYMED_BASE}/spls.json"
[docs]
def run(self, arguments):
# Extract possible filter conditions from arguments
params = {}
# Four common filter fields
if arguments.get("drug_name"):
params["drug_name"] = arguments["drug_name"]
if arguments.get("ndc"):
params["ndc"] = arguments["ndc"]
if arguments.get("rxcui"):
params["rxcui"] = arguments["rxcui"]
if arguments.get("setid"):
params["setid"] = arguments["setid"]
# Published date range filter
if arguments.get("published_date_gte"):
params["published_date[gte]"] = arguments["published_date_gte"]
if arguments.get("published_date_eq"):
params["published_date[eq]"] = arguments["published_date_eq"]
# Pagination parameters
params["pagesize"] = arguments.get("pagesize", 100)
params["page"] = arguments.get("page", 1)
# Allow query all if no filter conditions and only pagination provided (be careful with return data volume)
try:
resp = requests.get(self.endpoint, params=params, timeout=10)
except Exception as e:
return {"error": f"Failed to request DailyMed search_spls: {str(e)}"}
if resp.status_code != 200:
return {
"error": f"DailyMed API access failed, HTTP {resp.status_code}",
"detail": resp.text,
}
try:
result = resp.json()
except ValueError:
return {
"error": "Unable to parse DailyMed returned JSON.",
"content": resp.text,
}
# Return original JSON, including metadata + data
return result
[docs]
@register_tool("GetSPLBySetIDTool")
class GetSPLBySetIDTool(BaseTool):
"""
Get complete SPL label based on SPL Set ID, returns content in XML or JSON format.
"""
[docs]
def __init__(self, tool_config):
super().__init__(tool_config)
# Different suffixes for XML and JSON
self.endpoint_template = f"{DAILYMED_BASE}/spls/{{setid}}.{{fmt}}"
[docs]
def run(self, arguments):
setid = arguments.get("setid")
fmt = arguments.get("format", "xml")
# DailyMed single SPL API only supports XML format
if fmt not in ("xml",):
return {
"error": "DailyMed single SPL API only supports 'xml' format, JSON is not supported."
}
url = self.endpoint_template.format(setid=setid, fmt=fmt)
try:
resp = requests.get(url, timeout=10)
except Exception as e:
return {"error": f"Failed to request DailyMed get_spl_by_setid: {str(e)}"}
if resp.status_code == 404:
return {"error": f"SPL label not found for Set ID={setid}."}
elif resp.status_code == 415:
return {
"error": f"DailyMed API does not support requested format. Set ID={setid} only supports XML format."
}
elif resp.status_code != 200:
return {
"error": f"DailyMed API access failed, HTTP {resp.status_code}",
"detail": resp.text,
}
# Return XML content
return {"xml": resp.text}
[docs]
@register_tool("DailyMedSPLParserTool")
class DailyMedSPLParserTool(BaseTool):
"""
Parse DailyMed SPL XML into structured data (adverse reactions, dosing, contraindications, interactions, PK).
"""
[docs]
def __init__(self, tool_config):
super().__init__(tool_config)
self.endpoint_template = f"{DAILYMED_BASE}/spls/{{setid}}.xml"
# XML namespaces used in SPL documents
self.ns = {"hl7": "urn:hl7-org:v3"}
[docs]
def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Route to parser based on operation."""
if not LXML_AVAILABLE:
return {
"status": "error",
"error": "lxml not available. Install with: pip install lxml",
}
operation = arguments.get("operation")
setid = arguments.get("setid")
if not setid:
return {"status": "error", "error": "Missing required parameter: setid"}
if not operation:
return {"status": "error", "error": "Missing required parameter: operation"}
# Fetch SPL XML
xml_result = self._fetch_spl_xml(setid)
if xml_result.get("status") == "error":
return xml_result
xml_content = xml_result.get("xml")
if not xml_content:
return {"status": "error", "error": "No XML content returned"}
# Parse XML
try:
root = etree.fromstring(xml_content.encode("utf-8"))
except Exception as e:
return {"status": "error", "error": f"Failed to parse XML: {str(e)}"}
# Route to appropriate parser
if operation == "parse_adverse_reactions":
operation_result = self._parse_adverse_reactions(root)
elif operation == "parse_dosing":
operation_result = self._parse_dosing(root)
elif operation == "parse_contraindications":
operation_result = self._parse_contraindications(root)
elif operation == "parse_drug_interactions":
operation_result = self._parse_drug_interactions(root)
elif operation == "parse_clinical_pharmacology":
operation_result = self._parse_clinical_pharmacology(root)
else:
return {"status": "error", "error": f"Unknown operation: {operation}"}
return self._with_data_payload(operation_result)
def _with_data_payload(self, result: Dict[str, Any]) -> Dict[str, Any]:
"""Ensure successful operation responses include a standardized data wrapper."""
if not isinstance(result, dict):
return {"status": "success", "data": {"value": result}, "value": result}
if result.get("status") != "success":
return result
if "data" in result:
return result
payload = {key: value for key, value in result.items() if key != "status"}
wrapped_result = {"status": "success", "data": payload}
wrapped_result.update(payload)
return wrapped_result
def _fetch_spl_xml(self, setid: str) -> Dict[str, Any]:
"""Fetch SPL XML from DailyMed API."""
url = self.endpoint_template.format(setid=setid)
try:
resp = requests.get(url, timeout=30)
except Exception as e:
return {"status": "error", "error": f"Failed to fetch SPL: {str(e)}"}
if resp.status_code == 404:
return {"status": "error", "error": f"SPL not found for setid={setid}"}
elif resp.status_code != 200:
return {
"status": "error",
"error": f"HTTP {resp.status_code}: {resp.text[:200]}",
}
return {"status": "success", "xml": resp.text}
def _parse_adverse_reactions(self, root) -> Dict[str, Any]:
"""Parse adverse reactions section into structured table."""
try:
# Find adverse reactions section (code 34084-4)
sections = root.xpath(
"//hl7:section[hl7:code[@code='34084-4']]", namespaces=self.ns
)
if not sections:
return {
"status": "success",
"adverse_reactions": [],
"note": "No adverse reactions section found",
}
adverse_reactions = []
for section in sections:
# Extract text content
text_elements = section.xpath(".//hl7:text", namespaces=self.ns)
for text_el in text_elements:
# Look for tables
tables = text_el.xpath(".//hl7:table", namespaces=self.ns)
for table in tables:
table_data = self._extract_table_data(table)
if table_data:
adverse_reactions.extend(table_data)
# If no tables, extract paragraph text
if not tables:
paragraphs = text_el.xpath(
".//hl7:paragraph", namespaces=self.ns
)
for para in paragraphs:
text_content = "".join(para.itertext()).strip()
if text_content and len(text_content) > 10:
adverse_reactions.append(
{"type": "text", "content": text_content}
)
return {
"status": "success",
"adverse_reactions": adverse_reactions,
"count": len(adverse_reactions),
}
except Exception as e:
return {
"status": "error",
"error": f"Failed to parse adverse reactions: {str(e)}",
}
def _parse_dosing(self, root) -> Dict[str, Any]:
"""Parse dosage and administration section."""
try:
# Find dosage section (code 34068-7)
sections = root.xpath(
"//hl7:section[hl7:code[@code='34068-7']]", namespaces=self.ns
)
if not sections:
return {
"status": "success",
"dosing_info": [],
"note": "No dosing section found",
}
dosing_info = []
for section in sections:
text_elements = section.xpath(".//hl7:text", namespaces=self.ns)
for text_el in text_elements:
# Extract tables
tables = text_el.xpath(".//hl7:table", namespaces=self.ns)
for table in tables:
table_data = self._extract_table_data(table)
if table_data:
dosing_info.extend(table_data)
# Extract paragraphs
paragraphs = text_el.xpath(".//hl7:paragraph", namespaces=self.ns)
for para in paragraphs:
text_content = "".join(para.itertext()).strip()
if text_content and len(text_content) > 10:
dosing_info.append(
{"type": "dosing_text", "content": text_content}
)
return {
"status": "success",
"dosing_info": dosing_info,
"count": len(dosing_info),
}
except Exception as e:
return {"status": "error", "error": f"Failed to parse dosing: {str(e)}"}
def _parse_contraindications(self, root) -> Dict[str, Any]:
"""Parse contraindications section."""
try:
# Find contraindications section (code 34070-3)
sections = root.xpath(
"//hl7:section[hl7:code[@code='34070-3']]", namespaces=self.ns
)
if not sections:
return {
"status": "success",
"contraindications": [],
"note": "No contraindications section found",
}
contraindications = []
for section in sections:
text_elements = section.xpath(".//hl7:text", namespaces=self.ns)
for text_el in text_elements:
# Extract lists
list_items = text_el.xpath(".//hl7:item", namespaces=self.ns)
for item in list_items:
text_content = "".join(item.itertext()).strip()
if text_content and len(text_content) > 5:
contraindications.append(
{
"type": "contraindication",
"description": text_content,
}
)
# Extract paragraphs if no list items
if not list_items:
paragraphs = text_el.xpath(
".//hl7:paragraph", namespaces=self.ns
)
for para in paragraphs:
text_content = "".join(para.itertext()).strip()
if text_content and len(text_content) > 10:
contraindications.append(
{
"type": "contraindication",
"description": text_content,
}
)
return {
"status": "success",
"contraindications": contraindications,
"count": len(contraindications),
}
except Exception as e:
return {
"status": "error",
"error": f"Failed to parse contraindications: {str(e)}",
}
def _parse_drug_interactions(self, root) -> Dict[str, Any]:
"""Parse drug interactions section."""
try:
# Find drug interactions section (code 34073-7)
sections = root.xpath(
"//hl7:section[hl7:code[@code='34073-7']]", namespaces=self.ns
)
if not sections:
return {
"status": "success",
"interactions": [],
"note": "No drug interactions section found",
}
interactions = []
for section in sections:
text_elements = section.xpath(".//hl7:text", namespaces=self.ns)
for text_el in text_elements:
# Extract tables
tables = text_el.xpath(".//hl7:table", namespaces=self.ns)
for table in tables:
table_data = self._extract_table_data(table)
if table_data:
interactions.extend(table_data)
# Extract paragraphs
paragraphs = text_el.xpath(".//hl7:paragraph", namespaces=self.ns)
for para in paragraphs:
text_content = "".join(para.itertext()).strip()
if text_content and len(text_content) > 10:
interactions.append(
{"type": "interaction_text", "content": text_content}
)
return {
"status": "success",
"interactions": interactions,
"count": len(interactions),
}
except Exception as e:
return {
"status": "error",
"error": f"Failed to parse drug interactions: {str(e)}",
}
def _parse_clinical_pharmacology(self, root) -> Dict[str, Any]:
"""Parse clinical pharmacology section."""
try:
# Find clinical pharmacology section (code 34090-1)
sections = root.xpath(
"//hl7:section[hl7:code[@code='34090-1']]", namespaces=self.ns
)
if not sections:
return {
"status": "success",
"pharmacology": [],
"note": "No clinical pharmacology section found",
}
pharmacology = []
for section in sections:
text_elements = section.xpath(".//hl7:text", namespaces=self.ns)
for text_el in text_elements:
# Extract tables
tables = text_el.xpath(".//hl7:table", namespaces=self.ns)
for table in tables:
table_data = self._extract_table_data(table)
if table_data:
pharmacology.extend(table_data)
# Extract paragraphs
paragraphs = text_el.xpath(".//hl7:paragraph", namespaces=self.ns)
for para in paragraphs:
text_content = "".join(para.itertext()).strip()
if text_content and len(text_content) > 10:
pharmacology.append(
{"type": "pharmacology_text", "content": text_content}
)
return {
"status": "success",
"pharmacology": pharmacology,
"count": len(pharmacology),
}
except Exception as e:
return {
"status": "error",
"error": f"Failed to parse clinical pharmacology: {str(e)}",
}
def _extract_table_data(self, table_element) -> List[Dict[str, Any]]:
"""Extract structured data from table element."""
try:
rows_data = []
# Get table headers
headers = []
thead = table_element.xpath(".//hl7:thead", namespaces=self.ns)
if thead:
header_cells = thead[0].xpath(".//hl7:th", namespaces=self.ns)
headers = ["".join(cell.itertext()).strip() for cell in header_cells]
# Get table rows
tbody = table_element.xpath(".//hl7:tbody", namespaces=self.ns)
if tbody:
rows = tbody[0].xpath(".//hl7:tr", namespaces=self.ns)
for row in rows:
cells = row.xpath(".//hl7:td", namespaces=self.ns)
cell_data = ["".join(cell.itertext()).strip() for cell in cells]
if cell_data:
# Create dict if we have headers
if headers and len(headers) == len(cell_data):
row_dict = {
"type": "table_row",
"data": dict(zip(headers, cell_data)),
}
else:
row_dict = {"type": "table_row", "data": cell_data}
rows_data.append(row_dict)
return rows_data
except Exception:
return []