Source code for tooluniverse.openfda_tool
import requests
from .base_tool import BaseTool
from .tool_registry import register_tool
import copy
import re
import os
# Cache for GraphQL query to avoid repeated string operations
_OPENTARGETS_DRUG_NAMES_QUERY = None
_OPENTARGETS_ENDPOINT = "https://api.platform.opentargets.org/api/v4/graphql"
[docs]
def _get_drug_names_query():
"""Get the GraphQL query for drug names (cached)"""
global _OPENTARGETS_DRUG_NAMES_QUERY
if _OPENTARGETS_DRUG_NAMES_QUERY is None:
_OPENTARGETS_DRUG_NAMES_QUERY = (
"\n query drugNames($chemblId: String!) {\n "
"drug(chemblId: $chemblId) {\n id\n name\n " # noqa: E501
"tradeNames\n synonyms\n }\n }\n "
)
return _OPENTARGETS_DRUG_NAMES_QUERY
[docs]
def _execute_opentargets_query(chembl_id):
"""Directly execute OpenTargets GraphQL query (most efficient)"""
try:
from tooluniverse.graphql_tool import execute_query
query = _get_drug_names_query()
variables = {"chemblId": chembl_id}
return execute_query(
endpoint_url=_OPENTARGETS_ENDPOINT, query=query, variables=variables
)
except ImportError:
# Fallback if graphql_tool not available
import requests
query = _get_drug_names_query()
variables = {"chemblId": chembl_id}
response = requests.post(
_OPENTARGETS_ENDPOINT, json={"query": query, "variables": variables}
)
try:
result = response.json()
if "errors" in result:
return None
return result
except Exception:
return None
[docs]
def check_keys_present(api_capabilities_dict, keys):
for key in keys:
levels = key.split(".")
current_dict = api_capabilities_dict
key_present = True
for level in levels:
if level not in current_dict:
print(f"Key '{level}' not found in dictionary.")
key_present = False
break
if "properties" in current_dict[level]:
current_dict = current_dict[level]["properties"]
else:
current_dict = current_dict[level]
return key_present
[docs]
def extract_nested_fields(records, fields, keywords=None):
"""
Recursively extracts nested fields from a list of dictionaries.
:param records: List of dictionaries from which to extract fields
:param fields: List of nested fields to extract, each specified with dot notation (e.g., 'openfda.brand_name')
:return: List of dictionaries containing only the specified fields
"""
extracted_records = []
for record in records:
extracted_record = {}
for field in fields:
keys = field.split(".")
# print("keys", keys)
value = record
try:
for key in keys:
value = value[key]
if key != "openfda" and key != "generic_name" and key != "brand_name":
if len(keywords) > 0:
# print("key words:", keywords)
# print(value)
# print(type(value))
value = extract_sentences_with_keywords(value, keywords)
extracted_record[field] = value
except KeyError:
extracted_record[field] = None
if any(extracted_record.values()):
extracted_records.append(extracted_record)
return extracted_records
[docs]
def map_properties_to_openfda_fields(arguments, search_fields):
"""
Maps the provided arguments to the corresponding openFDA fields based on the search_fields mapping.
:param arguments: The input arguments containing property names and values.
:param search_fields: The mapping of property names to openFDA fields.
:return: A dictionary with openFDA fields and corresponding values.
"""
mapped_arguments = {}
for key, value in list(arguments.items()):
if key in search_fields:
# print("key in search_fields:", key)
openfda_fields = search_fields[key]
if isinstance(openfda_fields, list):
for field in openfda_fields:
mapped_arguments[field] = value
else:
mapped_arguments[openfda_fields] = value
del arguments[key]
arguments["search_fields"] = mapped_arguments
return arguments
[docs]
def extract_sentences_with_keywords(text_list, keywords):
"""
Extracts sentences containing any of the specified keywords from the text.
Parameters
- text (str): The input text from which to extract sentences.
- keywords (list): A list of keywords to search for in the text.
Returns
- list: A list of sentences containing any of the keywords.
"""
sentences_with_keywords = []
for text in text_list:
# Compile a regular expression pattern for sentence splitting
sentence_pattern = re.compile(r"(?<=[.!?]) +")
# Split the text into sentences
sentences = sentence_pattern.split(text)
# Initialize a list to hold sentences with keywords
# Iterate through each sentence
for sentence in sentences:
# Check if any of the keywords are present in the sentence
if any(keyword.lower() in sentence.lower() for keyword in keywords):
# If a keyword is found, add the sentence to the list
sentences_with_keywords.append(sentence)
return "......".join(sentences_with_keywords)
[docs]
def search_openfda(
params=None,
endpoint_url=None,
api_key=None,
sort=None,
limit=5,
skip=None,
count=None,
exists=None,
return_fields=None,
exist_option="OR",
search_keyword_option="AND",
keywords_filter=True,
):
# Initialize params if not provided
if params is None:
params = {}
if return_fields == "ALL":
exists = None
# Initialize search fields and construct search query
search_fields = params.get("search_fields", {})
search_query = []
keywords_list = []
if search_fields:
for field, value in search_fields.items():
# Merge multiple continuous black spaces into one and use one '+'
if (
keywords_filter
and field != "openfda.brand_name"
and field != "openfda.generic_name"
):
keywords_list.extend(value.split())
if field == "openfda.generic_name":
value = value.upper() # all generic names are in uppercase
value = value.replace(" and ", " ") # remove 'and' in the search query
value = value.replace(" AND ", " ") # remove 'AND' in the search query
# Remove quotes to avoid query errors
value = value.replace('"', "")
value = value.replace("'", "")
value = " ".join(value.split())
if search_keyword_option == "AND":
search_query.append(f"{field}:({value.replace(' ', '+AND+')})")
elif search_keyword_option == "OR":
search_query.append(f"{field}:({value.replace(' ', '+')})")
else:
print("Invalid search_keyword_option. Please use 'AND' or 'OR'.")
del params["search_fields"]
if search_query:
params["search"] = "+".join(search_query)
params["search"] = "(" + params["search"] + ")"
# Validate the presence of at least one of search, count, or sort
if not (
params.get("search")
or params.get("count")
or params.get("sort")
or search_fields
):
return {
"error": "You must provide at least one of 'search', 'count', or 'sort' parameters."
}
# Set additional query parameters
params["limit"] = params.get("limit", limit)
params["sort"] = params.get("sort", sort)
params["skip"] = params.get("skip", skip)
params["count"] = params.get("count", count)
if exists is not None:
if isinstance(exists, str):
exists = [exists]
if "search" in params:
if exist_option == "AND":
params["search"] += (
"+AND+("
+ "+AND+".join([f"_exists_:{keyword}" for keyword in exists])
+ ")"
)
elif exist_option == "OR":
params["search"] += (
"+AND+("
+ "+".join([f"_exists_:{keyword}" for keyword in exists])
+ ")"
)
else:
if exist_option == "AND":
params["search"] = "+AND+".join(
[f"_exists_:{keyword}" for keyword in exists]
)
elif exist_option == "OR":
params["search"] = "+".join(
[f"_exists_:{keyword}" for keyword in exists]
)
# Ensure that at least one of the search fields exists
params["search"] += (
"+AND+("
+ "+".join([f"_exists_:{field}" for field in search_fields.keys()])
+ ")"
)
# params['search']+="+AND+_exists_:openfda"
# Construct full query with additional parameters
query = "&".join(
[f"{key}={value}" for key, value in params.items() if value is not None]
)
full_url = f"{endpoint_url}?{query}"
if api_key:
full_url += f"&api_key={api_key}"
print(full_url)
response = requests.get(full_url)
# Get the JSON response
response_data = response.json()
if "error" in response_data:
print("Invalid Query: ", response_data["error"])
return None
# Extract meta information
meta_info = response_data.get("meta", {})
meta_info = meta_info.get("results", {})
# Extract results and return only the specified return fields
results = response_data.get("results", [])
if return_fields == "ALL":
return {"meta": meta_info, "results": results}
# If count parameter is used, return results directly (count API format)
if params.get("count") or count:
return {"meta": meta_info, "results": results}
required_fields = list(search_fields.keys()) + return_fields
extracted_results = extract_nested_fields(results, required_fields, keywords_list)
return {"meta": meta_info, "results": extracted_results}
[docs]
@register_tool("FDATool")
class FDATool(BaseTool):
[docs]
def __init__(self, tool_config, endpoint_url, api_key=None):
super().__init__(tool_config)
fields = tool_config["fields"]
self.search_fields = fields.get("search_fields", {})
self.return_fields = fields.get("return_fields", [])
self.exists = fields.get("exists", None)
if self.exists is None:
self.exists = self.return_fields
self.endpoint_url = endpoint_url
self.api_key = api_key or os.getenv("FDA_API_KEY")
[docs]
def run(self, arguments):
arguments = copy.deepcopy(arguments)
# Set default limit to 100 if not provided
if "limit" not in arguments or arguments["limit"] is None:
arguments["limit"] = 100
mapped_arguments = map_properties_to_openfda_fields(
arguments, self.search_fields
)
return search_openfda(
mapped_arguments,
endpoint_url=self.endpoint_url,
api_key=self.api_key,
exists=self.exists,
return_fields=self.return_fields,
exist_option="OR",
)
[docs]
@register_tool("FDADrugLabel")
class FDADrugLabelTool(FDATool):
[docs]
def __init__(self, tool_config, api_key=None):
endpoint_url = "https://api.fda.gov/drug/label.json"
super().__init__(tool_config, endpoint_url, api_key)
[docs]
def _is_chembl_id(self, value):
"""Check if the value looks like a ChEMBL ID"""
if not isinstance(value, str):
return False
# Normalize to uppercase for consistent handling
return value.upper().startswith("CHEMBL")
[docs]
def _convert_id_to_drug_name(self, chembl_id):
"""Convert ChEMBL ID to drug name using OpenTargets API"""
try:
# Directly call GraphQL API (most efficient, no tool overhead)
result = _execute_opentargets_query(chembl_id)
if result and isinstance(result, dict):
# Extract drug name from result
drug = None
if "drug" in result:
drug = result["drug"]
elif "data" in result and "drug" in result["data"]:
drug = result["data"]["drug"]
if drug:
# Prefer generic name, fallback to name, then trade names
name = drug.get("name")
if name:
msg = f"Converted ChEMBL ID {chembl_id} to drug name: {name}"
print(msg)
return name
# Try trade names as fallback
trade_names = drug.get("tradeNames", [])
if trade_names:
msg = (
f"Converted ChEMBL ID {chembl_id} "
f"to trade name: {trade_names[0]}"
)
print(msg)
return trade_names[0]
# No drug name found - the compound may not be approved as a drug
msg = (
f"Warning: Could not convert ChEMBL ID {chembl_id} "
f"to drug name. This compound may not be approved as a drug "
f"or may not be available in the OpenTargets database."
)
print(msg)
return None
except Exception as e:
msg = f"Error converting ChEMBL ID {chembl_id} to drug name: {e}"
print(msg)
return None
[docs]
def run(self, arguments):
"""Override run to support ChEMBL ID conversion"""
arguments = copy.deepcopy(arguments)
# Check if drug_name parameter is a ChEMBL ID
drug_name = arguments.get("drug_name")
# Only process if drug_name is a non-empty string
if drug_name and isinstance(drug_name, str) and drug_name.strip():
# Strip whitespace before checking
drug_name = drug_name.strip()
if self._is_chembl_id(drug_name):
# Normalize ChEMBL ID to uppercase (OpenTargets API expects uppercase)
chembl_id = drug_name.upper()
# Convert ChEMBL ID to drug name
converted_name = self._convert_id_to_drug_name(chembl_id)
if converted_name:
arguments["drug_name"] = converted_name
else:
# If conversion fails, provide helpful error message
error_msg = (
f"Could not convert ChEMBL ID {drug_name} to drug name. "
f"This compound (ChEMBL ID: {drug_name}) may not be "
f"approved as a drug yet, or it may not be available "
f"in the OpenTargets database. Please provide a drug "
f"name directly if you know it, or check if this "
f"compound is actually approved as a pharmaceutical "
f"drug."
)
return {"error": error_msg}
else:
# Not a ChEMBL ID, use original value (strip whitespace)
arguments["drug_name"] = drug_name
# Call parent run method
return super().run(arguments)
[docs]
@register_tool("FDADrugLabelSearchTool")
class FDADrugLabelSearchTool(FDATool):
[docs]
def __init__(self, tool_config=None, api_key=None):
self.tool_config = {
"name": "FDADrugLabelSearch",
"description": "Retrieve information of a specific drug.",
"label": ["search", "drug"],
"type": "FDADrugLabelSearch",
"parameter": {
"type": "object",
"properties": {
"drug_name": {
"type": "string",
"description": "The name of the drug.",
"required": True,
},
"return_fields": {
"type": "array",
"items": {
"type": "string",
"enum": [
"ALL",
"abuse",
"accessories",
"active_ingredient",
"adverse_reactions",
"alarms",
"animal_pharmacology_and_or_toxicology",
"ask_doctor",
"ask_doctor_or_pharmacist",
"assembly_or_installation_instructions",
"boxed_warning",
"calibration_instructions",
"carcinogenesis_and_mutagenesis_and_impairment_of_fertility",
"cleaning",
"clinical_pharmacology",
"clinical_studies",
"compatible_accessories",
"components",
"contraindications",
"controlled_substance",
"dependence",
"description",
"diagram_of_device",
"disposal_and_waste_handling",
"do_not_use",
"dosage_and_administration",
"dosage_forms_and_strengths",
"drug_abuse_and_dependence",
"drug_and_or_laboratory_test_interactions",
"drug_interactions",
"effective_time",
"environmental_warning",
"food_safety_warning",
"general_precautions",
"geriatric_use",
"guaranteed_analysis_of_feed",
"health_care_provider_letter",
"health_claim",
"how_supplied",
"id",
"inactive_ingredient",
"indications_and_usage",
"information_for_owners_or_caregivers",
"information_for_patients",
"instructions_for_use",
"intended_use_of_the_device",
"keep_out_of_reach_of_children",
"labor_and_delivery",
"laboratory_tests",
"mechanism_of_action",
"microbiology",
"nonclinical_toxicology",
"nonteratogenic_effects",
"nursing_mothers",
"openfda",
"other_safety_information",
"overdosage",
"package_label_principal_display_panel",
"patient_medication_information",
"pediatric_use",
"pharmacodynamics",
"pharmacogenomics",
"pharmacokinetics",
"precautions",
"pregnancy",
"pregnancy_or_breast_feeding",
"purpose",
"questions",
"recent_major_changes",
"references",
"residue_warning",
"risks",
"route",
"safe_handling_warning",
"set_id",
"spl_indexing_data_elements",
"spl_medguide",
"spl_patient_package_insert",
"spl_product_data_elements",
"spl_unclassified_section",
"statement_of_identity",
"stop_use",
"storage_and_handling",
"summary_of_safety_and_effectiveness",
"teratogenic_effects",
"troubleshooting",
"use_in_specific_populations",
"user_safety_warnings",
"version",
"warnings",
"warnings_and_cautions",
"when_using",
"meta",
],
"description": "Searchable field.",
},
"description": "Fields to search within drug labels.",
"required": True,
},
"limit": {
"type": "integer",
"description": "The number of records to return.",
"required": False,
},
"skip": {
"type": "integer",
"description": "The number of records to skip.",
"required": False,
},
},
},
"fields": {
"search_fields": {
"drug_name": ["openfda.brand_name", "openfda.generic_name"]
},
},
}
endpoint_url = "https://api.fda.gov/drug/label.json"
super().__init__(self.tool_config, endpoint_url, api_key)
[docs]
def run(self, arguments):
arguments = copy.deepcopy(arguments)
mapped_arguments = map_properties_to_openfda_fields(
arguments, self.search_fields
)
return_fields = arguments["return_fields"]
del arguments["return_fields"]
return search_openfda(
mapped_arguments,
endpoint_url=self.endpoint_url,
api_key=self.api_key,
return_fields=return_fields,
exists=return_fields,
exist_option="OR",
)
[docs]
@register_tool("FDADrugLabelSearchIDTool")
class FDADrugLabelSearchIDTool(FDATool):
[docs]
def __init__(self, tool_config=None, api_key=None):
self.tool_config = {
"name": "FDADrugLabelSearchALLTool",
"description": "Retrieve any related information to the query.",
"label": ["search", "drug"],
"type": "FDADrugLabelSearch",
"parameter": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "key words need to be searched.",
"required": True,
},
"return_fields": {
"type": "array",
"items": {
"type": "string",
"enum": [
"ALL",
"abuse",
"accessories",
"active_ingredient",
"adverse_reactions",
"alarms",
"animal_pharmacology_and_or_toxicology",
"ask_doctor",
"ask_doctor_or_pharmacist",
"assembly_or_installation_instructions",
"boxed_warning",
"calibration_instructions",
"carcinogenesis_and_mutagenesis_and_impairment_of_fertility",
"cleaning",
"clinical_pharmacology",
"clinical_studies",
"compatible_accessories",
"components",
"contraindications",
"controlled_substance",
"dependence",
"description",
"diagram_of_device",
"disposal_and_waste_handling",
"do_not_use",
"dosage_and_administration",
"dosage_forms_and_strengths",
"drug_abuse_and_dependence",
"drug_and_or_laboratory_test_interactions",
"drug_interactions",
"effective_time",
"environmental_warning",
"food_safety_warning",
"general_precautions",
"geriatric_use",
"guaranteed_analysis_of_feed",
"health_care_provider_letter",
"health_claim",
"how_supplied",
"id",
"inactive_ingredient",
"indications_and_usage",
"information_for_owners_or_caregivers",
"information_for_patients",
"instructions_for_use",
"intended_use_of_the_device",
"keep_out_of_reach_of_children",
"labor_and_delivery",
"laboratory_tests",
"mechanism_of_action",
"microbiology",
"nonclinical_toxicology",
"nonteratogenic_effects",
"nursing_mothers",
"openfda",
"other_safety_information",
"overdosage",
"package_label_principal_display_panel",
"patient_medication_information",
"pediatric_use",
"pharmacodynamics",
"pharmacogenomics",
"pharmacokinetics",
"precautions",
"pregnancy",
"pregnancy_or_breast_feeding",
"purpose",
"questions",
"recent_major_changes",
"references",
"residue_warning",
"risks",
"route",
"safe_handling_warning",
"set_id",
"spl_indexing_data_elements",
"spl_medguide",
"spl_patient_package_insert",
"spl_product_data_elements",
"spl_unclassified_section",
"statement_of_identity",
"stop_use",
"storage_and_handling",
"summary_of_safety_and_effectiveness",
"teratogenic_effects",
"troubleshooting",
"use_in_specific_populations",
"user_safety_warnings",
"version",
"warnings",
"warnings_and_cautions",
"when_using",
"meta",
],
"description": "Searchable field.",
},
"description": "Fields to search within drug labels.",
"required": True,
},
"limit": {
"type": "integer",
"description": "The number of records to return.",
"required": False,
},
"skip": {
"type": "integer",
"description": "The number of records to skip.",
"required": False,
},
},
},
"fields": {
"search_fields": {"query": ["id"]},
},
}
endpoint_url = "https://api.fda.gov/drug/label.json"
super().__init__(self.tool_config, endpoint_url, api_key)
[docs]
def run(self, arguments):
arguments = copy.deepcopy(arguments)
mapped_arguments = map_properties_to_openfda_fields(
arguments, self.search_fields
)
return_fields = arguments["return_fields"]
del arguments["return_fields"]
return search_openfda(
mapped_arguments,
endpoint_url=self.endpoint_url,
api_key=self.api_key,
return_fields=return_fields,
exists=return_fields,
exist_option="OR",
)
[docs]
@register_tool("FDADrugLabelGetDrugGenericNameTool")
class FDADrugLabelGetDrugGenericNameTool(FDADrugLabelTool):
[docs]
def __init__(self, tool_config=None, api_key=None):
if tool_config is None:
tool_config = {
"name": "get_drug_generic_name",
"description": "Get the drug’s generic name based on the drug's generic or brand name.",
"parameter": {
"type": "object",
"properties": {
"drug_name": {
"type": "string",
"description": "The generic or brand name of the drug.",
"required": True,
}
},
},
"fields": {
"search_fields": {
"drug_name": ["openfda.brand_name", "openfda.generic_name"]
},
"return_fields": ["openfda.generic_name"],
},
"type": "FDADrugLabelGetDrugGenericNameTool",
"label": ["FDADrugLabel", "purpose", "FDA"],
}
from .data.fda_drugs_with_brand_generic_names_for_tool import drug_list
self.brand_to_generic = {
drug["brand_name"]: drug["generic_name"] for drug in drug_list
}
self.generic_to_brand = {
drug["generic_name"]: drug["brand_name"] for drug in drug_list
}
super().__init__(tool_config, api_key)
[docs]
def run(self, arguments):
drug_info = {}
drug_name = arguments.get("drug_name")
if "-" in drug_name:
drug_name = drug_name.split("-")[
0
] # to handle some drug names such as tarlatamab-dlle
if drug_name in self.brand_to_generic:
drug_info["openfda.generic_name"] = self.brand_to_generic[drug_name]
drug_info["openfda.brand_name"] = drug_name
elif drug_name in self.generic_to_brand:
drug_info["openfda.brand_name"] = self.generic_to_brand[drug_name]
drug_info["openfda.generic_name"] = drug_name
else:
results = super().run(arguments)
if results is not None:
drug_info["openfda.generic_name"] = results["results"][0][
"openfda.generic_name"
][0]
drug_info["openfda.brand_name"] = results["results"][0][
"openfda.brand_name"
][0]
print("drug_info", drug_info)
else:
drug_info = None
return drug_info
[docs]
@register_tool("FDADrugLabelAggregated")
class FDADrugLabelGetDrugNamesByIndicationAggregated(FDADrugLabelTool):
"""
Enhanced version of FDA_get_drug_names_by_indication that:
- Iterates through all results in batches of 100 (no limit)
- Aggregates results by generic name
- Returns one entry per generic name with indication and all brand names
"""
[docs]
def run(self, arguments):
"""
Run the aggregated drug names search by indication.
Iterates through all results in batches of 100, aggregates by
generic name, and returns a list where each entry contains:
- generic_name: The generic drug name
- indication: The indication (from input)
- brand_names: List of all brand names for this generic name
"""
arguments = copy.deepcopy(arguments)
indication = arguments.get("indication")
if not indication:
return {"error": "indication parameter is required"}
# Dictionary to aggregate results by generic name
# Key: generic_name (normalized), Value: set of brand names
aggregated_results = {}
# Iterate through results in batches of 1000
step = 1000
skip = 0
total_fetched = 0
max_iterations = 1000 # Safety limit to prevent infinite loops
iteration = 0
while iteration < max_iterations:
iteration += 1
# Prepare arguments for this batch
batch_arguments = {"indication": indication, "limit": step, "skip": skip}
# Call parent run method to get results
batch_result = super().run(batch_arguments)
# Check for errors
if batch_result is None or "error" in batch_result:
# If we've already fetched some results, return what we have
if total_fetched > 0:
break
# Otherwise return the error
error_msg = "No results returned"
return batch_result if batch_result else {"error": error_msg}
# Extract results
results = batch_result.get("results", [])
meta = batch_result.get("meta", {})
# Process each result
for result in results:
generic_names = result.get("openfda.generic_name", [])
brand_names = result.get("openfda.brand_name", [])
# Handle both list and single value cases
if not isinstance(generic_names, list):
generic_names = [generic_names] if generic_names else []
if not isinstance(brand_names, list):
brand_names = [brand_names] if brand_names else []
# Normalize and process generic names
for generic_name in generic_names:
if not generic_name:
continue
# Normalize generic name (uppercase, strip whitespace)
normalized_generic = str(generic_name).upper().strip()
if normalized_generic:
# Initialize if not exists
if normalized_generic not in aggregated_results:
aggregated_results[normalized_generic] = set()
# Add all brand names for this generic name
for brand_name in brand_names:
if brand_name:
normalized_brand = str(brand_name).strip()
if normalized_brand:
aggregated_results[normalized_generic].add(
normalized_brand
)
total_fetched += len(results)
# Check if we've reached the end
# If we got fewer results than requested, we've reached the end
if len(results) < step:
# No more results to fetch
break
# Also check meta for total if available
total_available = meta.get("total", None)
if total_available is not None:
if skip + len(results) >= total_available:
# Reached the total available
break
# Move to next batch
skip += step
# Convert aggregated results to list format
result_list = []
for generic_name, brand_names_set in sorted(aggregated_results.items()):
result_list.append(
{
"generic_name": generic_name,
"indication": indication,
"brand_names": sorted(list(brand_names_set)),
}
)
return {
"meta": {
"total_generic_names": len(result_list),
"total_records_processed": total_fetched,
"indication": indication,
},
"results": result_list,
}
[docs]
@register_tool("FDADrugLabelStats")
class FDADrugLabelGetDrugNamesByIndicationStats(FDADrugLabelTool):
"""
Enhanced version using FDA count API to efficiently aggregate drug names
by indication. Uses count mechanism to get brand_name and generic_name
distributions without fetching full records.
"""
[docs]
def run(self, arguments):
"""
Run the aggregated drug names search using count API.
Uses count API to:
1. Get all unique generic names for the indication
2. For each generic name, get corresponding brand names
3. Return aggregated results
"""
arguments = copy.deepcopy(arguments)
indication = arguments.get("indication")
if not indication:
return {"error": "indication parameter is required"}
# Step 1: Get all unique generic names using count API
# Build search query for indication
# Use the same logic as parent class for building search query
indication_processed = indication.replace(" and ", " ")
indication_processed = indication_processed.replace(" AND ", " ")
indication_processed = " ".join(indication_processed.split())
# Remove or escape quotes to avoid query errors
indication_processed = indication_processed.replace('"', "")
indication_processed = indication_processed.replace("'", "")
indication_query = indication_processed.replace(" ", "+")
search_query = f'indications_and_usage:"{indication_query}"'
# Get all unique generic names using count API (use large limit)
generic_count_params = {
"search": search_query,
"count": "openfda.generic_name.exact",
"limit": 1000, # Large limit to get all results
}
generic_count_result = search_openfda(
generic_count_params,
endpoint_url=self.endpoint_url,
api_key=self.api_key,
return_fields=[],
exist_option="OR",
)
# Handle no matches found as empty result, not error
if generic_count_result is None:
all_generic_names_data = []
elif "error" in generic_count_result:
# Check if it's a "No matches found" error
error_msg = str(generic_count_result.get("error", {}))
if "No matches found" in error_msg or "NOT_FOUND" in error_msg:
all_generic_names_data = []
else:
return generic_count_result
else:
all_generic_names_data = generic_count_result.get("results", [])
if not all_generic_names_data:
return {
"meta": {
"total_generic_names": 0,
"total_brand_names": 0,
"indication": indication,
},
"results": {"generic_names": [], "brand_names": []},
}
# Step 2: Get all brand names using count API (only 2 API calls total)
brand_count_params = {
"search": search_query,
"count": "openfda.brand_name.exact",
"limit": 1000, # Large limit to get all results
}
brand_count_result = search_openfda(
brand_count_params,
endpoint_url=self.endpoint_url,
api_key=self.api_key,
return_fields=[],
exist_option="OR",
)
# Handle no matches found as empty result, not error
if brand_count_result is None:
brand_names_data = []
elif "error" in brand_count_result:
# Check if it's a "No matches found" error
error_msg = str(brand_count_result.get("error", {}))
if "No matches found" in error_msg or "NOT_FOUND" in error_msg:
brand_names_data = []
else:
# For other errors, still return generic names if available
brand_names_data = []
else:
brand_names_data = brand_count_result.get("results", [])
# Format generic names
generic_names_list = [
{"term": item.get("term", "").strip(), "count": item.get("count", 0)}
for item in all_generic_names_data
if item.get("term", "").strip()
]
generic_names_list = sorted(generic_names_list, key=lambda x: x["term"])
# Format brand names
brand_names_list = [
{"term": item.get("term", "").strip(), "count": item.get("count", 0)}
for item in brand_names_data
if item.get("term", "").strip()
]
brand_names_list = sorted(brand_names_list, key=lambda x: x["term"])
return {
"meta": {
"total_generic_names": len(generic_names_list),
"total_brand_names": len(brand_names_list),
"indication": indication,
},
"results": {
"generic_names": generic_names_list,
"brand_names": brand_names_list,
},
}