Source code for tooluniverse.rxnorm_extended_tool
"""
RxNorm Extended Tool
Extends the basic RxNorm drug name tool with additional endpoints from the
U.S. National Library of Medicine (NLM) RxNorm API:
- rxnorm_get_drug_info : comprehensive properties for a drug by name or RXCUI
- rxnorm_get_related_drugs: branded and generic products for an ingredient RXCUI
- rxnorm_find_rxcui : resolve a drug name to its RXCUI(s)
API base: https://rxnav.nlm.nih.gov/REST
No authentication required.
"""
import requests
from typing import Dict, Any
from .base_tool import BaseTool
from .tool_registry import register_tool
RXNORM_BASE = "https://rxnav.nlm.nih.gov/REST"
# Term-type (tty) labels used in RxNorm
TTY_LABELS = {
"IN": "Ingredient (generic)",
"PIN": "Precise Ingredient",
"BN": "Brand Name",
"SCD": "Semantic Clinical Drug (generic product)",
"SBD": "Semantic Branded Drug",
"GPCK": "Generic Pack",
"BPCK": "Branded Pack",
"SCDF": "Semantic Clinical Drug Form",
"SBDF": "Semantic Branded Drug Form",
"SCDC": "Semantic Drug Component",
"MIN": "Multiple Ingredients",
"DF": "Dose Form",
}
[docs]
@register_tool("RxNormExtendedTool")
class RxNormExtendedTool(BaseTool):
"""
Extended RxNorm tools for drug information retrieval.
Supports three operations:
- find_rxcui: Resolve a drug name to its RXCUI identifier(s)
- get_drug_info: Fetch full properties (name, TTY, synonym) by RXCUI or name
- get_related_drugs: List all branded and generic clinical drug products
that share an active ingredient
"""
[docs]
def __init__(self, tool_config: Dict[str, Any]):
super().__init__(tool_config)
self.timeout = tool_config.get("timeout", 30)
self.operation = tool_config.get("fields", {}).get("operation", "find_rxcui")
[docs]
def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
op = self.operation
if op == "find_rxcui":
return self._find_rxcui(arguments)
elif op == "get_drug_info":
return self._get_drug_info(arguments)
elif op == "get_related_drugs":
return self._get_related_drugs(arguments)
elif op == "get_ndc_status_history":
return self._get_ndc_status_history(arguments)
elif op == "get_ndc_properties":
return self._get_ndc_properties(arguments)
return {"status": "error", "error": f"Unknown operation: {op}"}
# ------------------------------------------------------------------
# helpers
# ------------------------------------------------------------------
[docs]
def _resolve_rxcui(self, arguments: Dict[str, Any]):
"""Return (rxcui_str, error_dict_or_None)."""
rxcui = arguments.get("rxcui")
drug_name = arguments.get("drug_name") or arguments.get("name")
if rxcui:
return str(rxcui).strip(), None
if drug_name:
url = f"{RXNORM_BASE}/rxcui.json"
try:
resp = requests.get(
url, params={"name": drug_name.strip()}, timeout=self.timeout
)
resp.raise_for_status()
ids = resp.json().get("idGroup", {}).get("rxnormId", [])
if not ids:
return None, {
"status": "error",
"error": f"No RXCUI found for drug name: {drug_name!r}",
}
return str(ids[0]), None
except requests.exceptions.RequestException as e:
return None, {
"status": "error",
"error": f"RxNorm API request failed: {e}",
}
return None, {
"status": "error",
"error": "Provide either 'rxcui' or 'drug_name'.",
}
# ------------------------------------------------------------------
# operations
# ------------------------------------------------------------------
[docs]
def _find_rxcui(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Resolve a drug name to one or more RXCUIs."""
drug_name = arguments.get("drug_name") or arguments.get("name")
if not drug_name or not str(drug_name).strip():
return {"status": "error", "error": "drug_name is required"}
url = f"{RXNORM_BASE}/rxcui.json"
try:
resp = requests.get(
url,
params={"name": drug_name.strip(), "search": 2},
timeout=self.timeout,
)
resp.raise_for_status()
data = resp.json()
except requests.exceptions.RequestException as e:
return {"status": "error", "error": f"RxNorm API request failed: {e}"}
except Exception as e:
return {"status": "error", "error": f"Failed to parse response: {e}"}
rxcuis = data.get("idGroup", {}).get("rxnormId", [])
if not rxcuis:
return {
"status": "success",
"data": {"drug_name": drug_name, "rxcuis": [], "found": False},
"metadata": {
"note": "No RXCUI found. Try a simpler drug name (generic, no dosage)."
},
}
return {
"status": "success",
"data": {
"drug_name": drug_name,
"rxcuis": rxcuis,
"primary_rxcui": rxcuis[0],
"found": True,
},
"metadata": {"source": "NLM RxNorm API"},
}
[docs]
def _get_drug_info(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Fetch comprehensive drug properties by RXCUI or drug name."""
rxcui, err = self._resolve_rxcui(arguments)
if err:
return err
url = f"{RXNORM_BASE}/rxcui/{rxcui}/properties.json"
try:
resp = requests.get(url, timeout=self.timeout)
resp.raise_for_status()
props = resp.json().get("properties", {})
except requests.exceptions.RequestException as e:
return {"status": "error", "error": f"RxNorm API request failed: {e}"}
except Exception as e:
return {"status": "error", "error": f"Failed to parse response: {e}"}
if not props:
return {
"status": "error",
"error": f"No properties found for RXCUI {rxcui}",
}
tty = props.get("tty", "")
tty_label = TTY_LABELS.get(tty, tty)
return {
"status": "success",
"data": {
"rxcui": props.get("rxcui", rxcui),
"name": props.get("name"),
"synonym": props.get("synonym") or None,
"term_type": tty,
"term_type_label": tty_label,
"language": props.get("language"),
},
"metadata": {"source": "NLM RxNorm API"},
}
[docs]
def _get_ndc_status_history(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Return active/obsolete status plus full RxCUI remapping timeline for an NDC."""
ndc = arguments.get("ndc")
if not ndc or not str(ndc).strip():
return {"status": "error", "error": "ndc is required"}
url = f"{RXNORM_BASE}/ndcstatus.json"
try:
resp = requests.get(
url, params={"ndc": str(ndc).strip()}, timeout=self.timeout
)
resp.raise_for_status()
data = resp.json()
except requests.exceptions.RequestException as e:
return {"status": "error", "error": f"RxNorm API request failed: {e}"}
except Exception as e:
return {"status": "error", "error": f"Failed to parse response: {e}"}
status_block = data.get("ndcStatus") or {}
if not status_block or not status_block.get("ndc11"):
return {
"status": "success",
"data": {
"ndc": str(ndc).strip(),
"found": False,
"ndc_status": None,
"ndc_history": [],
},
"metadata": {
"source": "NLM RxNorm API",
"note": "No NDC status found. Verify the NDC (try 11-digit form).",
},
}
history_raw = status_block.get("ndcHistory") or []
history = [
{
"active_rxcui": h.get("activeRxcui"),
"original_rxcui": h.get("originalRxcui"),
"start_date": h.get("startDate"),
"end_date": h.get("endDate"),
}
for h in history_raw
if isinstance(h, dict)
]
source_list = (status_block.get("sourceList") or {}).get("sourceName") or []
return {
"status": "success",
"data": {
"ndc": str(ndc).strip(),
"found": True,
"ndc11": status_block.get("ndc11"),
"status": status_block.get("status"),
"active": status_block.get("active"),
"rxcui": status_block.get("rxcui"),
"concept_name": status_block.get("conceptName"),
"concept_status": status_block.get("conceptStatus"),
"sources": source_list,
"ndc_history": history,
},
"metadata": {
"source": "NLM RxNorm API",
"total_history_periods": len(history),
},
}
[docs]
def _get_ndc_properties(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Return product/package metadata (imprint, color, labeler, SPL setid) for an NDC."""
ndc = arguments.get("ndc")
if not ndc or not str(ndc).strip():
return {"status": "error", "error": "ndc is required"}
url = f"{RXNORM_BASE}/ndcproperties.json"
try:
resp = requests.get(
url, params={"id": str(ndc).strip()}, timeout=self.timeout
)
resp.raise_for_status()
data = resp.json()
except requests.exceptions.RequestException as e:
return {"status": "error", "error": f"RxNorm API request failed: {e}"}
except Exception as e:
return {"status": "error", "error": f"Failed to parse response: {e}"}
prop_items = (data.get("ndcPropertyList") or {}).get("ndcProperty") or []
if not isinstance(prop_items, list):
prop_items = [prop_items]
if not prop_items:
return {
"status": "success",
"data": {"ndc": str(ndc).strip(), "found": False, "products": []},
"metadata": {
"source": "NLM RxNorm API",
"note": "No NDC properties found. Verify the NDC.",
},
}
products = []
for item in prop_items:
if not isinstance(item, dict):
continue
concepts = (item.get("propertyConceptList") or {}).get(
"propertyConcept"
) or []
if not isinstance(concepts, list):
concepts = [concepts]
props = {
c.get("propName"): c.get("propValue")
for c in concepts
if isinstance(c, dict) and c.get("propName")
}
packaging = (item.get("packagingList") or {}).get("packaging") or []
if not isinstance(packaging, list):
packaging = [packaging]
products.append(
{
"ndc_item": item.get("ndcItem"),
"ndc10": item.get("ndc10"),
"ndc9": item.get("ndc9"),
"rxcui": item.get("rxcui"),
"spl_set_id": item.get("splSetIdItem"),
"imprint_code": props.get("IMPRINT_CODE"),
"color": props.get("COLORTEXT"),
"shape": props.get("SHAPE"),
"labeler": props.get("LABELER"),
"marketing_category": props.get("MARKETING_CATEGORY"),
"anda": props.get("ANDA"),
"nda": props.get("NDA"),
"packaging": packaging,
"properties": props,
}
)
return {
"status": "success",
"data": {
"ndc": str(ndc).strip(),
"found": True,
"products": products,
},
"metadata": {
"source": "NLM RxNorm API",
"total_products": len(products),
},
}