Source code for tooluniverse.ctg_tool

from copy import deepcopy
from urllib.parse import urljoin
from .restful_tool import RESTfulTool, execute_RESTful_query
from .tool_registry import register_tool


[docs] @register_tool("ClinicalTrialsTool") class ClinicalTrialsTool(RESTfulTool):
[docs] def __init__(self, tool_config): base_url = "https://clinicaltrials.gov/api/v2" # Base URL for CTG API v2 full_url = urljoin(base_url + "/", tool_config["tool_url"].lstrip("/")) super().__init__(tool_config, full_url) self.list_params_to_join = [ "filter.ids", "filter.overallStatus", "fields", "sort", ] self.param_name_mapper = { "condition": "query.cond", "title": "query.titles", "intervention": "query.intr", "outcome": "query.outc", "overall_status": "filter.overallStatus", "query_term": "query.term", }
[docs] def _map_param_names(self, arguments): """ Maps the parameter names in the arguments dictionary to the expected parameter names defined in the tool's JSON configuration. Args: arguments (dict): Runtime arguments provided to the tool's run method. Returns: dict: A new dictionary with mapped parameter names. """ mapped_arguments = {} for key, value in arguments.items(): if key in self.param_name_mapper: mapped_key = self.param_name_mapper[key] mapped_arguments[mapped_key] = value else: mapped_arguments[key] = value return mapped_arguments
[docs] def _prepare_api_params(self, arguments): """ Prepares the dictionary of parameters for the API query string based on tool config and runtime arguments. Args: arguments (dict): Runtime arguments provided to the tool's run method. Returns: dict: A dictionary of parameters ready for the API requests. """ api_params = {} for param_name, value in arguments.items(): if value is not None: # Handle parameters defined as lists that need joining if param_name in self.list_params_to_join and isinstance(value, list): # Join list items into a comma-separated string api_params[param_name] = ",".join(map(str, value)) else: api_params[param_name] = value return api_params
[docs] def _format_endpoint_url(self, arguments): """ Formats the endpoint URL by substituting path parameters (like {nctId}) with values from the arguments dictionary. Args: arguments (dict): Runtime arguments provided to the tool's run method. Returns: str: The formatted endpoint URL. """ url_to_format = self.endpoint_url try: # Find keys in arguments that match placeholders in the URL template # e.g., if url_to_format is ".../studies/{nctId}", find 'nctId' in arguments path_params = { k: v for k, v in arguments.items() if f"{{{k}}}" in url_to_format } # Perform the substitution return url_to_format.format(**path_params) except KeyError as e: # This might happen if a placeholder exists but the corresponding key is missing in arguments print( f"Warning: Missing key {e} in arguments for URL formatting: {url_to_format}" ) # Return the original URL; the API call will likely fail, but avoids crashing here return url_to_format
[docs] def run(self, arguments): raise NotImplementedError("The run method should be implemented in subclasses.")
[docs] @register_tool("ClinicalTrialsSearchTool") # Searching studies (/studies) class ClinicalTrialsSearchTool(ClinicalTrialsTool):
[docs] def __init__(self, tool_config): super().__init__(tool_config) self.default_params_not_shown = { "format": "json", # Default format for the response "sort": "@relevance", # Default sort order "fields": [ "NCTId", "BriefTitle", # "OfficialTitle", "OverallStatus", # "StartDate", # "PrimaryCompletionDate", # "PrimaryOutcomeMeasure", # "DescriptionModule", "BriefSummary", "Condition", "Phase", # "Intervention", # "InterventionName", # "InterventionArmGroupLabel", # "InterventionOtherName", # "WhyStopped", # "HasResults", ], # NOTE: Can change this one "countTotal": True, # NOTE: Can change this one "filter.advanced": "AREA[HasResults]true AND (AREA[Phase]PHASE2 OR AREA[Phase]PHASE3 OR AREA[Phase]PHASE4)", # TODO: Consider adding a YEAR filter for the query to remove trials that are too early? E.g., "AREA[LastUpdatePostDate]RANGE[2000-01-01,MAX]" }
# "title": { # "type": "string", # "description": "Query for study titles using Essie expression syntax (e.g., 'lung cancer').", # "required": false # }, # "outcome": { # "type": "string", # "description": "Query for outcome measures using Essie expression syntax (e.g., 'overall survival', 'adverse events', 'progress-free survival').", # "required": false # }, # "query.locn": { # "type": "string", # "description": "Query for location terms using Essie expression syntax (e.g., 'California')." # }, # "overall_status": { # "type": "array", # "description": "Filter by a list of overall study statuses (e.g., ['RECRUITING', 'COMPLETED']). ", # "items": { # "type": "string", # "enum": ["ACTIVE_NOT_RECRUITING", "COMPLETED", "ENROLLING_BY_INVITATION", "NOT_YET_RECRUITING", "RECRUITING", "SUSPENDED", "TERMINATED", "WITHDRAWN", "AVAILABLE", "NO_LONGER_AVAILABLE", "TEMPORARILY_NOT_AVAILABLE", "APPROVED_FOR_MARKETING", "WITHHELD", "UNKNOWN"] # }, # "required": false # }, # "filter.ids": { # "type": "array", # "description": "Filter by a list of NCT IDs (e.g., ['NCT04852770', 'NCT01728545']).", # "items": { # "type": "string" # } # }, # "sort": { # "type": "array", # "description": "Comma- or pipe-separated list of fields to sort by for the studies, with optional direction. The returning studies are not sorted by default. Every list item contains a field/piece name and an optional sort direction (asc for ascending or desc for descending) after colon character (e.g., ['LastUpdatePostDate:desc', 'EnrollmentCount'], [@relevance]). Default sort order varies by field type. Special value '@relevance' sorts by query relevance.", # "items": { # "type": "string" # } # }, # "fields": { # "type": "array", # "description": "List of fields to return (e.g., ['NCTId', 'BriefTitle', 'OverallStatus', 'Phase', 'PrimaryCompletionDate', 'PrimaryOutcomeMeasure']). By default, we look at the following fields: ['NCTId', 'BriefTitle', 'OfficialTitle', 'OverallStatus', 'StartDate', 'PrimaryCompletionDate', 'PrimaryOutcomeMeasure', 'DescriptionModule', 'Condition', 'Phase', 'WhyStopped', 'HasResults'].", # "items": { # "type": "string" # }, # "required": false # },
[docs] def run(self, arguments): """ Executes the search query for clinical trials. Args: arguments (dict): A dictionary containing parameters provided by the user/LLM Returns: dict or str: The JSON response from the API as a dictionary, or raw text for non-JSON responses, or an error dictionary. """ arguments = self._map_param_names(arguments) query_params = deepcopy(self.query_schema) expected_param_names = self._map_param_names( self.parameters ).keys() # NOTE: Workaround for not having an aligned schema in the JSON config # Prepare API parameters from arguments for k in expected_param_names: if k in arguments and arguments[k] is not None: query_params[k] = arguments[k] # Add default parameters that are not shown in the schema for k, v in self.default_params_not_shown.items(): if k not in query_params: query_params[k] = v # Process list parameters that need to be joined api_params = self._prepare_api_params(query_params) # Fix a bug where 'countTotal' is a boolean but should be a string as input to API if "countTotal" in api_params and isinstance(api_params["countTotal"], bool): api_params["countTotal"] = str(api_params["countTotal"]).lower() formatted_endpoint_url = self.endpoint_url response = execute_RESTful_query( endpoint_url=formatted_endpoint_url, variables=api_params ) # Simplify the output if the response is valid if ( response is not None and response and "studies" in response.keys() and len(response["studies"]) > 0 ): response = self._simplify_output(response) else: return "No studies found for the given query parameters. Please examine your input and try different parameters." return response
[docs] def _simplify_output(self, response): new_response = [] for study in response["studies"]: new_study = { "NCT ID": study["protocolSection"]["identificationModule"].get("nctId"), } if "identificationModule" in study["protocolSection"]: new_study["brief_title"] = study["protocolSection"][ "identificationModule" ].get("briefTitle") if "descriptionModule" in study["protocolSection"]: new_study["brief_summary"] = study["protocolSection"][ "descriptionModule" ].get("briefSummary") if "statusModule" in study["protocolSection"]: new_study["overall_status"] = study["protocolSection"][ "statusModule" ].get("overallStatus") if "conditionsModule" in study["protocolSection"]: new_study["condition"] = study["protocolSection"][ "conditionsModule" ].get("conditions") if "designModule" in study["protocolSection"]: new_study["phase"] = study["protocolSection"]["designModule"].get( "phases" ) new_study = { k: v for k, v in new_study.items() if v is not None } # Remove None values new_response.append(new_study) # def remove_empty_values(obj): # if isinstance(obj, dict): # return {k: remove_empty_values(v) for k, v in obj.items() # if v not in [0, [], None]} # elif isinstance(obj, list): # return [remove_empty_values(v) for v in obj if v not in [0, [], None]] # else: # return obj # new_response = remove_empty_values(new_response) new_response = {"studies": new_response} if "nextPageToken" in response: new_response["nextPageToken"] = response["nextPageToken"] if "totalCount" in response: new_response["total_count"] = response["totalCount"] return new_response
[docs] @register_tool("ClinicalTrialsDetailsTool") class ClinicalTrialsDetailsTool(ClinicalTrialsTool):
[docs] def __init__(self, tool_config): super().__init__(tool_config) self.default_params_not_shown = { "format": "json", }
[docs] def run(self, arguments): arguments = self._map_param_names(arguments) expected_param_names = self._map_param_names(self.parameters).keys() query_params = deepcopy(self.query_schema) nct_ids_list = arguments.get("nct_ids") if ( not nct_ids_list or not isinstance(nct_ids_list, list) or len(nct_ids_list) == 0 ): return { "error": "Missing or invalid required parameter: nct_ids (must be a non-empty list)" } del arguments[ "nct_ids" ] # Remove 'nct_ids' from query_params as it is not a valid API parameter # Prepare API parameters from arguments for k in expected_param_names: if k in arguments and arguments[k] is not None: query_params[k] = arguments[k] # Add default parameters that are not shown in the schema for k, v in self.default_params_not_shown.items(): if k not in query_params: query_params[k] = v if "description_type" in expected_param_names: query_type = "description" if query_params["description_type"].lower() == "full": query_params["fields"] = [ "NCTId", "BriefTitle", "OfficialTitle", "BriefSummary", "DetailedDescription", "Phase", ] else: query_params["fields"] = [ "NCTId", "BriefTitle", "BriefSummary", "Phase", ] del query_params["description_type"] elif "status_and_date" in expected_param_names: query_type = "status_and_date" if "status_and_date" in query_params: del query_params["status_and_date"] query_params["fields"] = [ "NCTId", "OverallStatus", "LastKnownStatus", "WhyStopped", "StartDate", "PrimaryCompletionDate", "CompletionDate", ] elif "condition_and_intervention" in expected_param_names: query_type = "condition_and_intervention" if "condition_and_intervention" in query_params: del query_params["condition_and_intervention"] query_params["fields"] = [ "NCTId", "Condition", "ArmGroupLabel", "ArmGroupType", "ArmGroupDescription", "ArmGroupInterventionName", "InterventionType", "InterventionName", "InterventionOtherName", "InterventionDescription", # "InterventionArmGroupLabel", ] elif "eligibility_criteria" in expected_param_names: query_type = "eligibility_criteria" if "eligibility_criteria" in query_params: del query_params["eligibility_criteria"] query_params["fields"] = [ "NCTId", "HealthyVolunteers", "Sex", "GenderBased", "GenderDescription", "MinimumAge", "MaximumAge", "StudyPopulation", "EligibilityCriteria", # "SamplingMethod", ] elif "location" in expected_param_names: query_type = "location" if "location" in query_params: del query_params["location"] query_params["fields"] = [ "NCTId", "LocationFacility", "LocationStatus", "LocationCity", "LocationState", "LocationCountry", ] elif "outcome_measures" in expected_param_names: query_type = "outcome_measures" if query_params["outcome_measures"].lower() == "primary": query_params["fields"] = [ "NCTId", "PrimaryOutcome", ] elif query_params["outcome_measures"].lower() == "secondary": query_params["fields"] = [ "NCTId", "SecondaryOutcome", ] else: query_params["fields"] = [ "NCTId", "PrimaryOutcome", "SecondaryOutcome", # "OtherOutcome", ] del query_params["outcome_measures"] elif "references" in expected_param_names: query_type = "references" if "references" in query_params: del query_params["references"] query_params["fields"] = [ "NCTId", "Reference", "SeeAlsoLink", ] # more difficult extractions here elif "baseline_characteristics" in expected_param_names: query_type = "baseline_characteristics" del query_params["baseline_characteristics"] query_params["fields"] = [ "NCTId", "BaselineCharacteristicsModule", ] # TODO: Add this to the schema elif "outcome_measure" in expected_param_names: query_type = "outcome" outcome_measure = query_params["outcome_measure"] del query_params["outcome_measure"] query_params["fields"] = [ "NCTId", "OutcomeMeasure", ] elif "adverse_event_type" in expected_param_names: query_type = "safety" organs = query_params.get("organ_systems", []) adverse_event_type = query_params.get("adverse_event_type", "serious") if "organ_systems" in query_params: del query_params["organ_systems"] del query_params["adverse_event_type"] query_params["fields"] = [ "NCTId", "AdverseEventsModule", ] api_params = self._prepare_api_params(query_params) formatted_endpoint_url = self.endpoint_url responses = [] for nct_id in nct_ids_list: formatted_endpoint_url = self._format_endpoint_url({"nctId": nct_id}) response = execute_RESTful_query( endpoint_url=formatted_endpoint_url, variables=api_params ) if response: responses.append(response) if query_type not in {"outcome", "safety"}: responses = [ self._simplify_output(response, query_type) for response in responses ] elif query_type == "outcome": responses = [ self._extract_outcomes_from_output(response, outcome_measure) for response in responses ] elif query_type == "safety": responses = [ self._extract_safety_from_output(response, organs, adverse_event_type) for response in responses ] if sum([len(response) > 1 for response in responses]) == 0: return "No relevant information found for the given NCT IDs." return responses
[docs] def _simplify_output(self, study, query_type): """Manually extract generally most useful information""" new_study = { "NCT ID": study["protocolSection"]["identificationModule"].get("nctId"), } if "identificationModule" in study["protocolSection"]: if "briefTitle" in study["protocolSection"]["identificationModule"]: new_study["brief_title"] = study["protocolSection"][ "identificationModule" ].get("briefTitle") if "officialTitle" in study["protocolSection"]["identificationModule"]: new_study["official_title"] = study["protocolSection"][ "identificationModule" ].get("officialTitle") if "statusModule" in study["protocolSection"]: if "overallStatus" in study["protocolSection"]["statusModule"]: new_study["overall_status"] = study["protocolSection"][ "statusModule" ].get("overallStatus") if "lastKnownStatus" in study["protocolSection"]["statusModule"]: new_study["last_known_status"] = study["protocolSection"][ "statusModule" ].get("lastKnownStatus") if "whyStopped" in study["protocolSection"]["statusModule"]: new_study["why_stopped"] = study["protocolSection"]["statusModule"].get( "whyStopped" ) if "startDateStruct" in study["protocolSection"]["statusModule"]: new_study["start_date"] = study["protocolSection"]["statusModule"][ "startDateStruct" ].get("date") if ( "primaryCompletionDateStruct" in study["protocolSection"]["statusModule"] ): new_study["primary_completion_date"] = study["protocolSection"][ "statusModule" ]["primaryCompletionDateStruct"].get("date") if "completionDateStruct" in study["protocolSection"]["statusModule"]: new_study["completion_date"] = study["protocolSection"]["statusModule"][ "completionDateStruct" ].get("date") if "descriptionModule" in study["protocolSection"]: if "briefSummary" in study["protocolSection"]["descriptionModule"]: new_study["brief_summary"] = study["protocolSection"][ "descriptionModule" ].get("briefSummary") if "detailedDescription" in study["protocolSection"]["descriptionModule"]: new_study["detailed_description"] = study["protocolSection"][ "descriptionModule" ].get("detailedDescription") if "conditionsModule" in study["protocolSection"]: if "conditions" in study["protocolSection"]["conditionsModule"]: new_study["condition"] = study["protocolSection"][ "conditionsModule" ].get("conditions") if "designModule" in study["protocolSection"]: if "phases" in study["protocolSection"]["designModule"]: new_study["phase"] = study["protocolSection"]["designModule"].get( "phases" ) if "patientRegistry" in study["protocolSection"]["designModule"]: new_study["patient_registry"] = study["protocolSection"][ "designModule" ].get("patientRegistry") if "enrollmentInfo" in study["protocolSection"]["designModule"]: new_study["enrollment_info"] = study["protocolSection"][ "designModule" ].get("enrollmentInfo") if "armsInterventionsModule" in study["protocolSection"]: if "armGroups" in study["protocolSection"]["armsInterventionsModule"]: new_study["arm_groups"] = study["protocolSection"][ "armsInterventionsModule" ].get("armGroups") if "interventions" in study["protocolSection"]["armsInterventionsModule"]: new_study["interventions"] = study["protocolSection"][ "armsInterventionsModule" ].get("interventions") if "outcomesModule" in study["protocolSection"]: if "primaryOutcomes" in study["protocolSection"]["outcomesModule"]: new_study["primary_outcomes"] = study["protocolSection"][ "outcomesModule" ].get("primaryOutcomes") if "secondaryOutcomes" in study["protocolSection"]["outcomesModule"]: new_study["secondary_outcomes"] = study["protocolSection"][ "outcomesModule" ].get("secondaryOutcomes") # if "otherOutcomes" in study["protocolSection"]["outcomesModule"]: # new_study["other_outcomes"] = study["protocolSection"]["outcomesModule"].get("otherOutcomes") if "eligibilityModule" in study["protocolSection"]: if "eligibilityCriteria" in study["protocolSection"]["eligibilityModule"]: new_study["eligibility_criteria"] = study["protocolSection"][ "eligibilityModule" ].get("eligibilityCriteria") if "healthyVolunteers" in study["protocolSection"]["eligibilityModule"]: new_study["healthy_volunteers"] = study["protocolSection"][ "eligibilityModule" ].get("healthyVolunteers") if "sex" in study["protocolSection"]["eligibilityModule"]: new_study["sex"] = study["protocolSection"]["eligibilityModule"].get( "sex" ) if "genderBased" in study["protocolSection"]["eligibilityModule"]: new_study["gender_based"] = study["protocolSection"][ "eligibilityModule" ].get("genderBased") if "genderDescription" in study["protocolSection"]["eligibilityModule"]: new_study["gender_description"] = study["protocolSection"][ "eligibilityModule" ].get("genderDescription") if "minimumAge" in study["protocolSection"]["eligibilityModule"]: new_study["minimum_age"] = study["protocolSection"][ "eligibilityModule" ].get("minimumAge") if "maximumAge" in study["protocolSection"]["eligibilityModule"]: new_study["maximum_age"] = study["protocolSection"][ "eligibilityModule" ].get("maximumAge") if "studyPopulation" in study["protocolSection"]["eligibilityModule"]: new_study["study_population"] = study["protocolSection"][ "eligibilityModule" ].get("studyPopulation") # if "samplingMethod" in study["protocolSection"]["eligibilityModule"]: # new_study["sampling_method"] = study["protocolSection"]["eligibilityModule"].get("samplingMethod") if "contactsLocationsModule" in study["protocolSection"]: if "locations" in study["protocolSection"]["contactsLocationsModule"]: new_study["locations"] = study["protocolSection"][ "contactsLocationsModule" ].get("locations") if "referencesModule" in study["protocolSection"]: if "references" in study["protocolSection"]["referencesModule"]: new_study["references"] = study["protocolSection"][ "referencesModule" ].get("references") if "seeAlsoLinks" in study["protocolSection"]["referencesModule"]: new_study["see_also_links"] = study["protocolSection"][ "referencesModule" ].get("seeAlsoLinks") new_study = self._remove_empty_values(new_study) return new_study
[docs] def _extract_outcomes_from_output(self, study, outcome_measure): new_study = {} outcome_measure = outcome_measure.lower() new_study["NCT ID"] = study["protocolSection"]["identificationModule"].get( "nctId" ) if ( "resultsSection" in study and "outcomeMeasuresModule" in study["resultsSection"] and "outcomeMeasures" in study["resultsSection"]["outcomeMeasuresModule"] ): raw_outcomes = study["resultsSection"]["outcomeMeasuresModule"][ "outcomeMeasures" ] outcomes = [] for outcome in raw_outcomes: new_outcome = {} if (outcome_measure == "primary") and outcome.get("type") != "PRIMARY": continue if (outcome_measure == "secondary") and outcome.get( "type" ) != "SECONDARY": continue if (outcome_measure == "all") and outcome.get("type") not in [ "PRIMARY", "SECONDARY", ]: continue if outcome_measure not in ["primary", "secondary", "all"]: outcome_measure_variants = [outcome_measure] # TODO: Add more rules here outcome_measure_variants.append(outcome_measure.replace("-", " ")) outcome_measure_variants.append(outcome_measure.replace(" ", "-")) outcome_measure_variants.append( outcome_measure.replace("progression", "progress") ) outcome_measure_variants.append( outcome_measure.replace("progress ", "progression ") ) outcome_measure_variants.append( outcome_measure.replace("progress-", "progression-") ) outcome_measure_variants.append( outcome_measure.replace("patient", "participant") ) outcome_measure_variants.append( outcome_measure.replace("participant", "patient") ) outcome_measure_variants.append(outcome_measure.replace("_", " ")) outcome_measure_variants.append( outcome_measure.replace("percentage", "percent") ) outcome_measure_variants.append( outcome_measure.replace("percent ", "percentage ") ) outcome_measure_variants.append( outcome_measure.replace("percent-", "percentage-") ) outcome_measure_variants.append( outcome_measure.replace("proportion", "percentage") ) outcome_measure_variants.append( outcome_measure.replace("percentage", "proportion") ) outcome_measure_variants.append( outcome_measure.replace("proportion", "percent") ) outcome_measure_variants.append( outcome_measure.replace("percent", "proportion") ) outcome_measure_variants.append( outcome_measure.replace("time to event", "time-to-event") ) outcome_measure_variants.append( outcome_measure.replace("time-to-event", "time to event") ) outcome_measure_variants = list(set(outcome_measure_variants)) found_match = False for o in outcome_measure_variants: if ( o in outcome.get("title", "").lower() or o in outcome.get("description", "").lower() ): found_match = True break if not found_match: continue new_outcome["title"] = outcome.get("title") new_outcome["description"] = outcome.get("description") new_outcome["population"] = outcome.get("populationDescription") new_outcome["time_frame"] = outcome.get("timeFrame") new_outcome["unit_analyzed"] = outcome.get("typeUnitsAnalyzed") measurement_type = outcome.get("paramType") if measurement_type: measurement_type = measurement_type.lower() # GEOMETRIC_MEAN - Geometric Mean # GEOMETRIC_LEAST_SQUARES_MEAN - Geometric Least Squares Mean # LEAST_SQUARES_MEAN - Least Squares Mean # LOG_MEAN - Log Mean # MEAN - Mean # MEDIAN - Median # NUMBER - Number # COUNT_OF_PARTICIPANTS - Count of Participants # COUNT_OF_UNITS - Count of Units unit = outcome.get("unitOfMeasure") new_outcome["groups"] = outcome.get("groups") denoms = outcome.get("denoms") if denoms is not None: if len(denoms) > 1: # TODO: Investigate such trials return f"Warning: Multiple denoms found for outcome {new_outcome['title']} in study {new_study['NCT ID']}." denoms = denoms[0]["counts"] new_outcome["denominators"] = denoms classes = outcome.get("classes") if classes is not None: if len(classes) > 1: # TODO: Investigate such trials return f"Warning: Multiple classes found for outcome {new_outcome['title']} in study {new_study['NCT ID']}." if "title" in classes[0] or "denoms" in classes[0]: # TODO: Investigate such trials return f"Warning: Unexpected structure in classes for outcome {new_outcome['title']} in study {new_study['NCT ID']}." classes = classes[0] elif "categories" in classes[0]: classes = classes[0]["categories"] if len(classes) > 1: # TODO: Investigate such trials return f"Warning: Multiple classes-categories found for outcome {new_outcome['title']} in study {new_study['NCT ID']}." if "title" in classes[0]: # TODO: Investigate such trials return f"Warning: Unexpected structure in classes-categories for outcome {new_outcome['title']} in study {new_study['NCT ID']}." classes = classes[0] elif "measurements" in classes[0]: classes = classes[0]["measurements"] else: # TODO: Investigate such trials return f"Warning: Unexpected structure in classes-categories for outcome {new_outcome['title']} in study {new_study['NCT ID']}." else: # TODO: Investigate such trials return f"Warning: Unexpected structure in classes for outcome {new_outcome['title']} in study {new_study['NCT ID']}." if measurement_type and unit: new_outcome[measurement_type + " (" + unit + ")"] = classes else: # TODO: Investigate such trials return f"Warning: Missing paramType or unitOfMeasure for outcome {new_outcome['title']} in study {new_study['NCT ID']}." analyses = outcome.get("analyses") if analyses is not None: if len(analyses) > 1: # TODO: Investigate such trials return f"Warning: Multiple analyses found for outcome {new_outcome['title']} in study {new_study['NCT ID']}." analyses = analyses[0] pvalue = analyses.get("pValue") pvalue_comment = analyses.get("pValueComment") statistic_test = analyses.get("statisticalMethod") statistic_comment = analyses.get("statisticalComment") statistic_name = analyses.get("paramType") statistic = analyses.get("paramValue") if statistic_name and statistic_test and statistic and pvalue: new_outcome["p-value (" + statistic_test + ")"] = pvalue new_outcome[statistic_name] = statistic else: # TODO: Investigate such trials return f"Warning: Missing paramType, paramValue, statisticalMethod or pvalue for outcome {new_outcome['title']} in study {new_study['NCT ID']}." if statistic_comment: new_outcome["statistic_comment"] = statistic_comment if pvalue_comment: new_outcome["pvalue_comment"] = pvalue_comment statistic_test_type = analyses.get("nonInferiorityType") statistic_test_type_comment = analyses.get( "nonInferiorityTypeComment" ) if statistic_test_type and statistic_test_type_comment: new_outcome["statistic_test_type"] = statistic_test_type new_outcome["statistic_test_type_comment"] = ( statistic_test_type_comment ) outcomes.append(new_outcome) new_study["outcomes"] = outcomes new_study = self._remove_empty_values(new_study) return new_study
[docs] def _extract_safety_from_output(self, study, organs, adverse_event_type): new_study = {} adverse_event_type = adverse_event_type.lower() organs = [org.lower() for org in organs] new_study["NCT ID"] = study["protocolSection"]["identificationModule"].get( "nctId" ) if ( "resultsSection" in study and "adverseEventsModule" in study["resultsSection"] ): ae_data = study["resultsSection"]["adverseEventsModule"] new_study["freq_threshold"] = ( ae_data["frequencyThreshold"] + "%" if "frequencyThreshold" in ae_data else None ) groups = ae_data["eventGroups"] for group in groups: if "deathsNumAffected" in group: del group["deathsNumAffected"] if "deathsNumAtRisk" in group: del group["deathsNumAtRisk"] # if "seriousNumAffected" in group: # del group["seriousNumAffected"] # if "seriousNumAtRisk" in group: # del group["seriousNumAtRisk"] # if "otherNumAffected" in group: # del group["otherNumAffected"] # if "otherNumAtRisk" in group: # del group["otherNumAtRisk"] new_study["groups"] = groups if "seriousEvents" in ae_data and adverse_event_type != "other": raw_aes = ae_data["seriousEvents"] serious_aes = [] for ae in raw_aes: if adverse_event_type not in {"serious", "all"}: ae_name = ae.get("term", "").lower() if adverse_event_type not in ae_name: continue if len(organs) > 0: organ_system = ae.get("organSystem", "").lower() if organ_system not in organs: continue if "sourceVocabulary" in ae: del ae["sourceVocabulary"] if "assessmentType" in ae: del ae["assessmentType"] if "stats" in ae and len(ae["stats"]) > 0: for group_stats in ae["stats"]: if ( group_stats.get("numAffected") is not None and group_stats.get("numAtRisk") is not None and group_stats.get("numAtRisk", 0) > 0 ): group_stats["percentage"] = ( str( round( group_stats.get("numAffected", 0) / group_stats.get("numAtRisk", 1) * 100, 2, ) ) + "%" ) elif ( group_stats.get("numEvents") is not None and group_stats.get("numAtRisk") is not None and group_stats.get("numAtRisk", 0) > 0 ): group_stats["percentage"] = ( str( round( group_stats.get("numEvents", 0) / group_stats.get("numAtRisk", 1) * 100, 2, ) ) + "%" ) else: group_stats["percentage"] = None if "numEvents" in group_stats: del group_stats["numEvents"] serious_aes.append(ae) new_study["serious_adverse_events"] = serious_aes if "otherEvents" in ae_data and adverse_event_type != "serious": raw_aes = ae_data["otherEvents"] other_aes = [] for ae in raw_aes: if adverse_event_type not in {"other", "all"}: ae_name = ae.get("term", "").lower() if adverse_event_type not in ae_name: continue if len(organs) > 0: organ_system = ae.get("organSystem", "").lower() if organ_system not in organs: continue if "sourceVocabulary" in ae: del ae["sourceVocabulary"] if "assessmentType" in ae: del ae["assessmentType"] if "stats" in ae and len(ae["stats"]) > 0: for group_stats in ae["stats"]: if ( group_stats.get("numAffected") is not None and group_stats.get("numAtRisk") is not None and group_stats.get("numAtRisk", 0) > 0 ): group_stats["percentage"] = ( str( round( group_stats.get("numAffected", 0) / group_stats.get("numAtRisk", 1) * 100, 2, ) ) + "%" ) elif ( group_stats.get("numEvents") is not None and group_stats.get("numAtRisk") is not None and group_stats.get("numAtRisk", 0) > 0 ): group_stats["percentage"] = ( str( round( group_stats.get("numeEvents", 0) / group_stats.get("numAtRisk", 1) * 100, 2, ) ) + "%" ) else: group_stats["percentage"] = None if "numEvents" in group_stats: del group_stats["numEvents"] other_aes.append(ae) new_study["other_adverse_events"] = other_aes new_study = self._remove_empty_values(new_study) return new_study
[docs] def _remove_empty_values(self, obj): if isinstance(obj, dict): return { k: self._remove_empty_values(v) for k, v in obj.items() if v not in [[], None] } elif isinstance(obj, list): return [self._remove_empty_values(v) for v in obj if v not in [[], None]] else: return obj