Source code for tooluniverse.openfda_tool

import requests
from .base_tool import BaseTool
from .tool_registry import register_tool
import copy
import re
import os


[docs] def check_keys_present(api_capabilities_dict, keys): for key in keys: levels = key.split(".") current_dict = api_capabilities_dict key_present = True for level in levels: if level not in current_dict: print(f"Key '{level}' not found in dictionary.") key_present = False break if "properties" in current_dict[level]: current_dict = current_dict[level]["properties"] else: current_dict = current_dict[level] return key_present
[docs] def extract_nested_fields(records, fields, keywords=None): """ Recursively extracts nested fields from a list of dictionaries. :param records: List of dictionaries from which to extract fields :param fields: List of nested fields to extract, each specified with dot notation (e.g., 'openfda.brand_name') :return: List of dictionaries containing only the specified fields """ extracted_records = [] for record in records: extracted_record = {} for field in fields: keys = field.split(".") # print("keys", keys) value = record try: for key in keys: value = value[key] if key != "openfda" and key != "generic_name" and key != "brand_name": if len(keywords) > 0: # print("key words:", keywords) # print(value) # print(type(value)) value = extract_sentences_with_keywords(value, keywords) extracted_record[field] = value except KeyError: extracted_record[field] = None if any(extracted_record.values()): extracted_records.append(extracted_record) return extracted_records
[docs] def map_properties_to_openfda_fields(arguments, search_fields): """ Maps the provided arguments to the corresponding openFDA fields based on the search_fields mapping. :param arguments: The input arguments containing property names and values. :param search_fields: The mapping of property names to openFDA fields. :return: A dictionary with openFDA fields and corresponding values. """ mapped_arguments = {} for key, value in list(arguments.items()): if key in search_fields: # print("key in search_fields:", key) openfda_fields = search_fields[key] if isinstance(openfda_fields, list): for field in openfda_fields: mapped_arguments[field] = value else: mapped_arguments[openfda_fields] = value del arguments[key] arguments["search_fields"] = mapped_arguments return arguments
[docs] def extract_sentences_with_keywords(text_list, keywords): """ Extracts sentences containing any of the specified keywords from the text. Parameters: - text (str): The input text from which to extract sentences. - keywords (list): A list of keywords to search for in the text. Returns: - list: A list of sentences containing any of the keywords. """ sentences_with_keywords = [] for text in text_list: # Compile a regular expression pattern for sentence splitting sentence_pattern = re.compile(r"(?<=[.!?]) +") # Split the text into sentences sentences = sentence_pattern.split(text) # Initialize a list to hold sentences with keywords # Iterate through each sentence for sentence in sentences: # Check if any of the keywords are present in the sentence if any(keyword.lower() in sentence.lower() for keyword in keywords): # If a keyword is found, add the sentence to the list sentences_with_keywords.append(sentence) return "......".join(sentences_with_keywords)
[docs] def search_openfda( params=None, endpoint_url=None, api_key=None, sort=None, limit=5, skip=None, count=None, exists=None, return_fields=None, exist_option="OR", search_keyword_option="AND", keywords_filter=True, ): # Initialize params if not provided if params is None: params = {} if return_fields == "ALL": exists = None # Initialize search fields and construct search query search_fields = params.get("search_fields", {}) search_query = [] keywords_list = [] if search_fields: for field, value in search_fields.items(): # Merge multiple continuous black spaces into one and use one '+' if ( keywords_filter and field != "openfda.brand_name" and field != "openfda.generic_name" ): keywords_list.extend(value.split()) if field == "openfda.generic_name": value = value.upper() # all generic names are in uppercase value = value.replace(" and ", " ") # remove 'and' in the search query value = value.replace(" AND ", " ") # remove 'AND' in the search query value = " ".join(value.split()) if search_keyword_option == "AND": search_query.append(f'{field}:({value.replace(" ", "+AND+")})') elif search_keyword_option == "OR": search_query.append(f'{field}:({value.replace(" ", "+")})') else: print("Invalid search_keyword_option. Please use 'AND' or 'OR'.") del params["search_fields"] if search_query: params["search"] = "+".join(search_query) params["search"] = "(" + params["search"] + ")" # Validate the presence of at least one of search, count, or sort if not ( params.get("search") or params.get("count") or params.get("sort") or search_fields ): return { "error": "You must provide at least one of 'search', 'count', or 'sort' parameters." } # Set additional query parameters params["limit"] = params.get("limit", limit) params["sort"] = params.get("sort", sort) params["skip"] = params.get("skip", skip) params["count"] = params.get("count", count) if exists is not None: if isinstance(exists, str): exists = [exists] if "search" in params: if exist_option == "AND": params["search"] += ( "+AND+(" + "+AND+".join([f"_exists_:{keyword}" for keyword in exists]) + ")" ) elif exist_option == "OR": params["search"] += ( "+AND+(" + "+".join([f"_exists_:{keyword}" for keyword in exists]) + ")" ) else: if exist_option == "AND": params["search"] = "+AND+".join( [f"_exists_:{keyword}" for keyword in exists] ) elif exist_option == "OR": params["search"] = "+".join( [f"_exists_:{keyword}" for keyword in exists] ) # Ensure that at least one of the search fields exists params["search"] += ( "+AND+(" + "+".join([f"_exists_:{field}" for field in search_fields.keys()]) + ")" ) # params['search']+="+AND+_exists_:openfda" # Construct full query with additional parameters query = "&".join( [f"{key}={value}" for key, value in params.items() if value is not None] ) full_url = f"{endpoint_url}?{query}" if api_key: full_url += f"&api_key={api_key}" print(full_url) response = requests.get(full_url) # Get the JSON response response_data = response.json() if "error" in response_data: print("Invalid Query: ", response_data["error"]) return None # Extract meta information meta_info = response_data.get("meta", {}) meta_info = meta_info.get("results", {}) # Extract results and return only the specified return fields results = response_data.get("results", []) if return_fields == "ALL": return {"meta": meta_info, "results": results} required_fields = list(search_fields.keys()) + return_fields extracted_results = extract_nested_fields(results, required_fields, keywords_list) return {"meta": meta_info, "results": extracted_results}
[docs] @register_tool("FDATool") class FDATool(BaseTool):
[docs] def __init__(self, tool_config, endpoint_url, api_key=None): super().__init__(tool_config) fields = tool_config["fields"] self.search_fields = fields.get("search_fields", {}) self.return_fields = fields.get("return_fields", []) self.exists = fields.get("exists", None) if self.exists is None: self.exists = self.return_fields self.endpoint_url = endpoint_url self.api_key = api_key or os.getenv("FDA_API_KEY")
[docs] def run(self, arguments): arguments = copy.deepcopy(arguments) mapped_arguments = map_properties_to_openfda_fields( arguments, self.search_fields ) return search_openfda( mapped_arguments, endpoint_url=self.endpoint_url, api_key=self.api_key, exists=self.exists, return_fields=self.return_fields, exist_option="OR", )
[docs] @register_tool("FDADrugLabel") class FDADrugLabelTool(FDATool):
[docs] def __init__(self, tool_config, api_key=None): endpoint_url = "https://api.fda.gov/drug/label.json" super().__init__(tool_config, endpoint_url, api_key)
[docs] @register_tool("FDADrugLabelSearchTool") class FDADrugLabelSearchTool(FDATool):
[docs] def __init__(self, tool_config=None, api_key=None): self.tool_config = { "name": "FDADrugLabelSearch", "description": "Retrieve information of a specific drug.", "label": ["search", "drug"], "type": "FDADrugLabelSearch", "parameter": { "type": "object", "properties": { "drug_name": { "type": "string", "description": "The name of the drug.", "required": True, }, "return_fields": { "type": "array", "items": { "type": "string", "enum": [ "ALL", "abuse", "accessories", "active_ingredient", "adverse_reactions", "alarms", "animal_pharmacology_and_or_toxicology", "ask_doctor", "ask_doctor_or_pharmacist", "assembly_or_installation_instructions", "boxed_warning", "calibration_instructions", "carcinogenesis_and_mutagenesis_and_impairment_of_fertility", "cleaning", "clinical_pharmacology", "clinical_studies", "compatible_accessories", "components", "contraindications", "controlled_substance", "dependence", "description", "diagram_of_device", "disposal_and_waste_handling", "do_not_use", "dosage_and_administration", "dosage_forms_and_strengths", "drug_abuse_and_dependence", "drug_and_or_laboratory_test_interactions", "drug_interactions", "effective_time", "environmental_warning", "food_safety_warning", "general_precautions", "geriatric_use", "guaranteed_analysis_of_feed", "health_care_provider_letter", "health_claim", "how_supplied", "id", "inactive_ingredient", "indications_and_usage", "information_for_owners_or_caregivers", "information_for_patients", "instructions_for_use", "intended_use_of_the_device", "keep_out_of_reach_of_children", "labor_and_delivery", "laboratory_tests", "mechanism_of_action", "microbiology", "nonclinical_toxicology", "nonteratogenic_effects", "nursing_mothers", "openfda", "other_safety_information", "overdosage", "package_label_principal_display_panel", "patient_medication_information", "pediatric_use", "pharmacodynamics", "pharmacogenomics", "pharmacokinetics", "precautions", "pregnancy", "pregnancy_or_breast_feeding", "purpose", "questions", "recent_major_changes", "references", "residue_warning", "risks", "route", "safe_handling_warning", "set_id", "spl_indexing_data_elements", "spl_medguide", "spl_patient_package_insert", "spl_product_data_elements", "spl_unclassified_section", "statement_of_identity", "stop_use", "storage_and_handling", "summary_of_safety_and_effectiveness", "teratogenic_effects", "troubleshooting", "use_in_specific_populations", "user_safety_warnings", "version", "warnings", "warnings_and_cautions", "when_using", "meta", ], "description": "Searchable field.", }, "description": "Fields to search within drug labels.", "required": True, }, "limit": { "type": "integer", "description": "The number of records to return.", "required": False, }, "skip": { "type": "integer", "description": "The number of records to skip.", "required": False, }, }, }, "fields": { "search_fields": { "drug_name": ["openfda.brand_name", "openfda.generic_name"] }, }, } endpoint_url = "https://api.fda.gov/drug/label.json" super().__init__(self.tool_config, endpoint_url, api_key)
[docs] def run(self, arguments): arguments = copy.deepcopy(arguments) mapped_arguments = map_properties_to_openfda_fields( arguments, self.search_fields ) return_fields = arguments["return_fields"] del arguments["return_fields"] return search_openfda( mapped_arguments, endpoint_url=self.endpoint_url, api_key=self.api_key, return_fields=return_fields, exists=return_fields, exist_option="OR", )
[docs] @register_tool("FDADrugLabelSearchIDTool") class FDADrugLabelSearchIDTool(FDATool):
[docs] def __init__(self, tool_config=None, api_key=None): self.tool_config = { "name": "FDADrugLabelSearchALLTool", "description": "Retrieve any related information to the query.", "label": ["search", "drug"], "type": "FDADrugLabelSearch", "parameter": { "type": "object", "properties": { "query": { "type": "string", "description": "key words need to be searched.", "required": True, }, "return_fields": { "type": "array", "items": { "type": "string", "enum": [ "ALL", "abuse", "accessories", "active_ingredient", "adverse_reactions", "alarms", "animal_pharmacology_and_or_toxicology", "ask_doctor", "ask_doctor_or_pharmacist", "assembly_or_installation_instructions", "boxed_warning", "calibration_instructions", "carcinogenesis_and_mutagenesis_and_impairment_of_fertility", "cleaning", "clinical_pharmacology", "clinical_studies", "compatible_accessories", "components", "contraindications", "controlled_substance", "dependence", "description", "diagram_of_device", "disposal_and_waste_handling", "do_not_use", "dosage_and_administration", "dosage_forms_and_strengths", "drug_abuse_and_dependence", "drug_and_or_laboratory_test_interactions", "drug_interactions", "effective_time", "environmental_warning", "food_safety_warning", "general_precautions", "geriatric_use", "guaranteed_analysis_of_feed", "health_care_provider_letter", "health_claim", "how_supplied", "id", "inactive_ingredient", "indications_and_usage", "information_for_owners_or_caregivers", "information_for_patients", "instructions_for_use", "intended_use_of_the_device", "keep_out_of_reach_of_children", "labor_and_delivery", "laboratory_tests", "mechanism_of_action", "microbiology", "nonclinical_toxicology", "nonteratogenic_effects", "nursing_mothers", "openfda", "other_safety_information", "overdosage", "package_label_principal_display_panel", "patient_medication_information", "pediatric_use", "pharmacodynamics", "pharmacogenomics", "pharmacokinetics", "precautions", "pregnancy", "pregnancy_or_breast_feeding", "purpose", "questions", "recent_major_changes", "references", "residue_warning", "risks", "route", "safe_handling_warning", "set_id", "spl_indexing_data_elements", "spl_medguide", "spl_patient_package_insert", "spl_product_data_elements", "spl_unclassified_section", "statement_of_identity", "stop_use", "storage_and_handling", "summary_of_safety_and_effectiveness", "teratogenic_effects", "troubleshooting", "use_in_specific_populations", "user_safety_warnings", "version", "warnings", "warnings_and_cautions", "when_using", "meta", ], "description": "Searchable field.", }, "description": "Fields to search within drug labels.", "required": True, }, "limit": { "type": "integer", "description": "The number of records to return.", "required": False, }, "skip": { "type": "integer", "description": "The number of records to skip.", "required": False, }, }, }, "fields": { "search_fields": {"query": ["id"]}, }, } endpoint_url = "https://api.fda.gov/drug/label.json" super().__init__(self.tool_config, endpoint_url, api_key)
[docs] def run(self, arguments): arguments = copy.deepcopy(arguments) mapped_arguments = map_properties_to_openfda_fields( arguments, self.search_fields ) return_fields = arguments["return_fields"] del arguments["return_fields"] return search_openfda( mapped_arguments, endpoint_url=self.endpoint_url, api_key=self.api_key, return_fields=return_fields, exists=return_fields, exist_option="OR", )
[docs] @register_tool("FDADrugLabelGetDrugGenericNameTool") class FDADrugLabelGetDrugGenericNameTool(FDADrugLabelTool):
[docs] def __init__(self, tool_config=None, api_key=None): if tool_config is None: tool_config = { "name": "get_drug_generic_name", "description": "Get the drug’s generic name based on the drug's generic or brand name.", "parameter": { "type": "object", "properties": { "drug_name": { "type": "string", "description": "The generic or brand name of the drug.", "required": True, } }, }, "fields": { "search_fields": { "drug_name": ["openfda.brand_name", "openfda.generic_name"] }, "return_fields": ["openfda.generic_name"], }, "type": "FDADrugLabelGetDrugGenericNameTool", "label": ["FDADrugLabel", "purpose", "FDA"], } from .data.fda_drugs_with_brand_generic_names_for_tool import drug_list self.brand_to_generic = { drug["brand_name"]: drug["generic_name"] for drug in drug_list } self.generic_to_brand = { drug["generic_name"]: drug["brand_name"] for drug in drug_list } super().__init__(tool_config, api_key)
[docs] def run(self, arguments): drug_info = {} drug_name = arguments.get("drug_name") if "-" in drug_name: drug_name = drug_name.split("-")[ 0 ] # to handle some drug names such as tarlatamab-dlle if drug_name in self.brand_to_generic: drug_info["openfda.generic_name"] = self.brand_to_generic[drug_name] drug_info["openfda.brand_name"] = drug_name elif drug_name in self.generic_to_brand: drug_info["openfda.brand_name"] = self.generic_to_brand[drug_name] drug_info["openfda.generic_name"] = drug_name else: results = super().run(arguments) if results is not None: drug_info["openfda.generic_name"] = results["results"][0][ "openfda.generic_name" ][0] drug_info["openfda.brand_name"] = results["results"][0][ "openfda.brand_name" ][0] print("drug_info", drug_info) else: drug_info = None return drug_info