Source code for tooluniverse.openfda_tool
import requests
from .base_tool import BaseTool
from .tool_registry import register_tool
import copy
import re
import os
[docs]
def check_keys_present(api_capabilities_dict, keys):
for key in keys:
levels = key.split(".")
current_dict = api_capabilities_dict
key_present = True
for level in levels:
if level not in current_dict:
print(f"Key '{level}' not found in dictionary.")
key_present = False
break
if "properties" in current_dict[level]:
current_dict = current_dict[level]["properties"]
else:
current_dict = current_dict[level]
return key_present
[docs]
def extract_nested_fields(records, fields, keywords=None):
"""
Recursively extracts nested fields from a list of dictionaries.
:param records: List of dictionaries from which to extract fields
:param fields: List of nested fields to extract, each specified with dot notation (e.g., 'openfda.brand_name')
:return: List of dictionaries containing only the specified fields
"""
extracted_records = []
for record in records:
extracted_record = {}
for field in fields:
keys = field.split(".")
# print("keys", keys)
value = record
try:
for key in keys:
value = value[key]
if key != "openfda" and key != "generic_name" and key != "brand_name":
if len(keywords) > 0:
# print("key words:", keywords)
# print(value)
# print(type(value))
value = extract_sentences_with_keywords(value, keywords)
extracted_record[field] = value
except KeyError:
extracted_record[field] = None
if any(extracted_record.values()):
extracted_records.append(extracted_record)
return extracted_records
[docs]
def map_properties_to_openfda_fields(arguments, search_fields):
"""
Maps the provided arguments to the corresponding openFDA fields based on the search_fields mapping.
:param arguments: The input arguments containing property names and values.
:param search_fields: The mapping of property names to openFDA fields.
:return: A dictionary with openFDA fields and corresponding values.
"""
mapped_arguments = {}
for key, value in list(arguments.items()):
if key in search_fields:
# print("key in search_fields:", key)
openfda_fields = search_fields[key]
if isinstance(openfda_fields, list):
for field in openfda_fields:
mapped_arguments[field] = value
else:
mapped_arguments[openfda_fields] = value
del arguments[key]
arguments["search_fields"] = mapped_arguments
return arguments
[docs]
def extract_sentences_with_keywords(text_list, keywords):
"""
Extracts sentences containing any of the specified keywords from the text.
Parameters:
- text (str): The input text from which to extract sentences.
- keywords (list): A list of keywords to search for in the text.
Returns:
- list: A list of sentences containing any of the keywords.
"""
sentences_with_keywords = []
for text in text_list:
# Compile a regular expression pattern for sentence splitting
sentence_pattern = re.compile(r"(?<=[.!?]) +")
# Split the text into sentences
sentences = sentence_pattern.split(text)
# Initialize a list to hold sentences with keywords
# Iterate through each sentence
for sentence in sentences:
# Check if any of the keywords are present in the sentence
if any(keyword.lower() in sentence.lower() for keyword in keywords):
# If a keyword is found, add the sentence to the list
sentences_with_keywords.append(sentence)
return "......".join(sentences_with_keywords)
[docs]
def search_openfda(
params=None,
endpoint_url=None,
api_key=None,
sort=None,
limit=5,
skip=None,
count=None,
exists=None,
return_fields=None,
exist_option="OR",
search_keyword_option="AND",
keywords_filter=True,
):
# Initialize params if not provided
if params is None:
params = {}
if return_fields == "ALL":
exists = None
# Initialize search fields and construct search query
search_fields = params.get("search_fields", {})
search_query = []
keywords_list = []
if search_fields:
for field, value in search_fields.items():
# Merge multiple continuous black spaces into one and use one '+'
if (
keywords_filter
and field != "openfda.brand_name"
and field != "openfda.generic_name"
):
keywords_list.extend(value.split())
if field == "openfda.generic_name":
value = value.upper() # all generic names are in uppercase
value = value.replace(" and ", " ") # remove 'and' in the search query
value = value.replace(" AND ", " ") # remove 'AND' in the search query
value = " ".join(value.split())
if search_keyword_option == "AND":
search_query.append(f'{field}:({value.replace(" ", "+AND+")})')
elif search_keyword_option == "OR":
search_query.append(f'{field}:({value.replace(" ", "+")})')
else:
print("Invalid search_keyword_option. Please use 'AND' or 'OR'.")
del params["search_fields"]
if search_query:
params["search"] = "+".join(search_query)
params["search"] = "(" + params["search"] + ")"
# Validate the presence of at least one of search, count, or sort
if not (
params.get("search")
or params.get("count")
or params.get("sort")
or search_fields
):
return {
"error": "You must provide at least one of 'search', 'count', or 'sort' parameters."
}
# Set additional query parameters
params["limit"] = params.get("limit", limit)
params["sort"] = params.get("sort", sort)
params["skip"] = params.get("skip", skip)
params["count"] = params.get("count", count)
if exists is not None:
if isinstance(exists, str):
exists = [exists]
if "search" in params:
if exist_option == "AND":
params["search"] += (
"+AND+("
+ "+AND+".join([f"_exists_:{keyword}" for keyword in exists])
+ ")"
)
elif exist_option == "OR":
params["search"] += (
"+AND+("
+ "+".join([f"_exists_:{keyword}" for keyword in exists])
+ ")"
)
else:
if exist_option == "AND":
params["search"] = "+AND+".join(
[f"_exists_:{keyword}" for keyword in exists]
)
elif exist_option == "OR":
params["search"] = "+".join(
[f"_exists_:{keyword}" for keyword in exists]
)
# Ensure that at least one of the search fields exists
params["search"] += (
"+AND+("
+ "+".join([f"_exists_:{field}" for field in search_fields.keys()])
+ ")"
)
# params['search']+="+AND+_exists_:openfda"
# Construct full query with additional parameters
query = "&".join(
[f"{key}={value}" for key, value in params.items() if value is not None]
)
full_url = f"{endpoint_url}?{query}"
if api_key:
full_url += f"&api_key={api_key}"
print(full_url)
response = requests.get(full_url)
# Get the JSON response
response_data = response.json()
if "error" in response_data:
print("Invalid Query: ", response_data["error"])
return None
# Extract meta information
meta_info = response_data.get("meta", {})
meta_info = meta_info.get("results", {})
# Extract results and return only the specified return fields
results = response_data.get("results", [])
if return_fields == "ALL":
return {"meta": meta_info, "results": results}
required_fields = list(search_fields.keys()) + return_fields
extracted_results = extract_nested_fields(results, required_fields, keywords_list)
return {"meta": meta_info, "results": extracted_results}
[docs]
@register_tool("FDATool")
class FDATool(BaseTool):
[docs]
def __init__(self, tool_config, endpoint_url, api_key=None):
super().__init__(tool_config)
fields = tool_config["fields"]
self.search_fields = fields.get("search_fields", {})
self.return_fields = fields.get("return_fields", [])
self.exists = fields.get("exists", None)
if self.exists is None:
self.exists = self.return_fields
self.endpoint_url = endpoint_url
self.api_key = api_key or os.getenv("FDA_API_KEY")
[docs]
def run(self, arguments):
arguments = copy.deepcopy(arguments)
mapped_arguments = map_properties_to_openfda_fields(
arguments, self.search_fields
)
return search_openfda(
mapped_arguments,
endpoint_url=self.endpoint_url,
api_key=self.api_key,
exists=self.exists,
return_fields=self.return_fields,
exist_option="OR",
)
[docs]
@register_tool("FDADrugLabel")
class FDADrugLabelTool(FDATool):
[docs]
def __init__(self, tool_config, api_key=None):
endpoint_url = "https://api.fda.gov/drug/label.json"
super().__init__(tool_config, endpoint_url, api_key)
[docs]
@register_tool("FDADrugLabelSearchTool")
class FDADrugLabelSearchTool(FDATool):
[docs]
def __init__(self, tool_config=None, api_key=None):
self.tool_config = {
"name": "FDADrugLabelSearch",
"description": "Retrieve information of a specific drug.",
"label": ["search", "drug"],
"type": "FDADrugLabelSearch",
"parameter": {
"type": "object",
"properties": {
"drug_name": {
"type": "string",
"description": "The name of the drug.",
"required": True,
},
"return_fields": {
"type": "array",
"items": {
"type": "string",
"enum": [
"ALL",
"abuse",
"accessories",
"active_ingredient",
"adverse_reactions",
"alarms",
"animal_pharmacology_and_or_toxicology",
"ask_doctor",
"ask_doctor_or_pharmacist",
"assembly_or_installation_instructions",
"boxed_warning",
"calibration_instructions",
"carcinogenesis_and_mutagenesis_and_impairment_of_fertility",
"cleaning",
"clinical_pharmacology",
"clinical_studies",
"compatible_accessories",
"components",
"contraindications",
"controlled_substance",
"dependence",
"description",
"diagram_of_device",
"disposal_and_waste_handling",
"do_not_use",
"dosage_and_administration",
"dosage_forms_and_strengths",
"drug_abuse_and_dependence",
"drug_and_or_laboratory_test_interactions",
"drug_interactions",
"effective_time",
"environmental_warning",
"food_safety_warning",
"general_precautions",
"geriatric_use",
"guaranteed_analysis_of_feed",
"health_care_provider_letter",
"health_claim",
"how_supplied",
"id",
"inactive_ingredient",
"indications_and_usage",
"information_for_owners_or_caregivers",
"information_for_patients",
"instructions_for_use",
"intended_use_of_the_device",
"keep_out_of_reach_of_children",
"labor_and_delivery",
"laboratory_tests",
"mechanism_of_action",
"microbiology",
"nonclinical_toxicology",
"nonteratogenic_effects",
"nursing_mothers",
"openfda",
"other_safety_information",
"overdosage",
"package_label_principal_display_panel",
"patient_medication_information",
"pediatric_use",
"pharmacodynamics",
"pharmacogenomics",
"pharmacokinetics",
"precautions",
"pregnancy",
"pregnancy_or_breast_feeding",
"purpose",
"questions",
"recent_major_changes",
"references",
"residue_warning",
"risks",
"route",
"safe_handling_warning",
"set_id",
"spl_indexing_data_elements",
"spl_medguide",
"spl_patient_package_insert",
"spl_product_data_elements",
"spl_unclassified_section",
"statement_of_identity",
"stop_use",
"storage_and_handling",
"summary_of_safety_and_effectiveness",
"teratogenic_effects",
"troubleshooting",
"use_in_specific_populations",
"user_safety_warnings",
"version",
"warnings",
"warnings_and_cautions",
"when_using",
"meta",
],
"description": "Searchable field.",
},
"description": "Fields to search within drug labels.",
"required": True,
},
"limit": {
"type": "integer",
"description": "The number of records to return.",
"required": False,
},
"skip": {
"type": "integer",
"description": "The number of records to skip.",
"required": False,
},
},
},
"fields": {
"search_fields": {
"drug_name": ["openfda.brand_name", "openfda.generic_name"]
},
},
}
endpoint_url = "https://api.fda.gov/drug/label.json"
super().__init__(self.tool_config, endpoint_url, api_key)
[docs]
def run(self, arguments):
arguments = copy.deepcopy(arguments)
mapped_arguments = map_properties_to_openfda_fields(
arguments, self.search_fields
)
return_fields = arguments["return_fields"]
del arguments["return_fields"]
return search_openfda(
mapped_arguments,
endpoint_url=self.endpoint_url,
api_key=self.api_key,
return_fields=return_fields,
exists=return_fields,
exist_option="OR",
)
[docs]
@register_tool("FDADrugLabelSearchIDTool")
class FDADrugLabelSearchIDTool(FDATool):
[docs]
def __init__(self, tool_config=None, api_key=None):
self.tool_config = {
"name": "FDADrugLabelSearchALLTool",
"description": "Retrieve any related information to the query.",
"label": ["search", "drug"],
"type": "FDADrugLabelSearch",
"parameter": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "key words need to be searched.",
"required": True,
},
"return_fields": {
"type": "array",
"items": {
"type": "string",
"enum": [
"ALL",
"abuse",
"accessories",
"active_ingredient",
"adverse_reactions",
"alarms",
"animal_pharmacology_and_or_toxicology",
"ask_doctor",
"ask_doctor_or_pharmacist",
"assembly_or_installation_instructions",
"boxed_warning",
"calibration_instructions",
"carcinogenesis_and_mutagenesis_and_impairment_of_fertility",
"cleaning",
"clinical_pharmacology",
"clinical_studies",
"compatible_accessories",
"components",
"contraindications",
"controlled_substance",
"dependence",
"description",
"diagram_of_device",
"disposal_and_waste_handling",
"do_not_use",
"dosage_and_administration",
"dosage_forms_and_strengths",
"drug_abuse_and_dependence",
"drug_and_or_laboratory_test_interactions",
"drug_interactions",
"effective_time",
"environmental_warning",
"food_safety_warning",
"general_precautions",
"geriatric_use",
"guaranteed_analysis_of_feed",
"health_care_provider_letter",
"health_claim",
"how_supplied",
"id",
"inactive_ingredient",
"indications_and_usage",
"information_for_owners_or_caregivers",
"information_for_patients",
"instructions_for_use",
"intended_use_of_the_device",
"keep_out_of_reach_of_children",
"labor_and_delivery",
"laboratory_tests",
"mechanism_of_action",
"microbiology",
"nonclinical_toxicology",
"nonteratogenic_effects",
"nursing_mothers",
"openfda",
"other_safety_information",
"overdosage",
"package_label_principal_display_panel",
"patient_medication_information",
"pediatric_use",
"pharmacodynamics",
"pharmacogenomics",
"pharmacokinetics",
"precautions",
"pregnancy",
"pregnancy_or_breast_feeding",
"purpose",
"questions",
"recent_major_changes",
"references",
"residue_warning",
"risks",
"route",
"safe_handling_warning",
"set_id",
"spl_indexing_data_elements",
"spl_medguide",
"spl_patient_package_insert",
"spl_product_data_elements",
"spl_unclassified_section",
"statement_of_identity",
"stop_use",
"storage_and_handling",
"summary_of_safety_and_effectiveness",
"teratogenic_effects",
"troubleshooting",
"use_in_specific_populations",
"user_safety_warnings",
"version",
"warnings",
"warnings_and_cautions",
"when_using",
"meta",
],
"description": "Searchable field.",
},
"description": "Fields to search within drug labels.",
"required": True,
},
"limit": {
"type": "integer",
"description": "The number of records to return.",
"required": False,
},
"skip": {
"type": "integer",
"description": "The number of records to skip.",
"required": False,
},
},
},
"fields": {
"search_fields": {"query": ["id"]},
},
}
endpoint_url = "https://api.fda.gov/drug/label.json"
super().__init__(self.tool_config, endpoint_url, api_key)
[docs]
def run(self, arguments):
arguments = copy.deepcopy(arguments)
mapped_arguments = map_properties_to_openfda_fields(
arguments, self.search_fields
)
return_fields = arguments["return_fields"]
del arguments["return_fields"]
return search_openfda(
mapped_arguments,
endpoint_url=self.endpoint_url,
api_key=self.api_key,
return_fields=return_fields,
exists=return_fields,
exist_option="OR",
)
[docs]
@register_tool("FDADrugLabelGetDrugGenericNameTool")
class FDADrugLabelGetDrugGenericNameTool(FDADrugLabelTool):
[docs]
def __init__(self, tool_config=None, api_key=None):
if tool_config is None:
tool_config = {
"name": "get_drug_generic_name",
"description": "Get the drug’s generic name based on the drug's generic or brand name.",
"parameter": {
"type": "object",
"properties": {
"drug_name": {
"type": "string",
"description": "The generic or brand name of the drug.",
"required": True,
}
},
},
"fields": {
"search_fields": {
"drug_name": ["openfda.brand_name", "openfda.generic_name"]
},
"return_fields": ["openfda.generic_name"],
},
"type": "FDADrugLabelGetDrugGenericNameTool",
"label": ["FDADrugLabel", "purpose", "FDA"],
}
from .data.fda_drugs_with_brand_generic_names_for_tool import drug_list
self.brand_to_generic = {
drug["brand_name"]: drug["generic_name"] for drug in drug_list
}
self.generic_to_brand = {
drug["generic_name"]: drug["brand_name"] for drug in drug_list
}
super().__init__(tool_config, api_key)
[docs]
def run(self, arguments):
drug_info = {}
drug_name = arguments.get("drug_name")
if "-" in drug_name:
drug_name = drug_name.split("-")[
0
] # to handle some drug names such as tarlatamab-dlle
if drug_name in self.brand_to_generic:
drug_info["openfda.generic_name"] = self.brand_to_generic[drug_name]
drug_info["openfda.brand_name"] = drug_name
elif drug_name in self.generic_to_brand:
drug_info["openfda.brand_name"] = self.generic_to_brand[drug_name]
drug_info["openfda.generic_name"] = drug_name
else:
results = super().run(arguments)
if results is not None:
drug_info["openfda.generic_name"] = results["results"][0][
"openfda.generic_name"
][0]
drug_info["openfda.brand_name"] = results["results"][0][
"openfda.brand_name"
][0]
print("drug_info", drug_info)
else:
drug_info = None
return drug_info