Source code for tooluniverse.who_gho_tool

import requests
import re
from typing import Dict, Any, Optional, List
from .base_tool import BaseTool
from .tool_registry import register_tool

WHO_GHO_BASE_URL = "https://ghoapi.azureedge.net/api"

# Common country name to ISO code mappings
COUNTRY_MAPPINGS = {
    "usa": "USA", "united states": "USA", "us": "USA",
    "uk": "GBR", "united kingdom": "GBR", "britain": "GBR",
    "china": "CHN", "chinese": "CHN",
    "india": "IND", "indian": "IND",
    "japan": "JPN", "japanese": "JPN",
    "germany": "DEU", "german": "DEU",
    "france": "FRA", "french": "FRA",
    "italy": "ITA", "italian": "ITA",
    "spain": "ESP", "spanish": "ESP",
    "canada": "CAN", "canadian": "CAN",
    "australia": "AUS", "australian": "AUS",
    "brazil": "BRA", "brazilian": "BRA",
    "russia": "RUS", "russian": "RUS",
    "south korea": "KOR", "korea": "KOR",
    "mexico": "MEX", "mexican": "MEX",
}


[docs] @register_tool("WHOGHORESTTool") class WHOGHORESTTool(BaseTool): """Base class for WHO Global Health Observatory (GHO) REST API tools."""
[docs] def __init__(self, tool_config): super().__init__(tool_config) self.endpoint = tool_config["fields"]["endpoint"] fields = tool_config.get("fields", {}) self.filter_by_code = fields.get("filter_by_code", False)
[docs] def _make_request( self, params: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """Make request to WHO GHO API.""" # Build OData query parameters odata_params = {} filter_parts = [] # Check if we need to use direct indicator endpoint format # WHO GHO API uses /api/{IndicatorCode} instead of /api/Data use_direct_indicator_endpoint = False indicator_code = None if params and "indicator_code" in params: indicator_code = params["indicator_code"] # Use direct indicator endpoint if endpoint is /Data if self.endpoint == "/Data": use_direct_indicator_endpoint = True # URL will be /api/{IndicatorCode} instead of /api/Data url = f"{WHO_GHO_BASE_URL}/{indicator_code}" else: url = f"{WHO_GHO_BASE_URL}{self.endpoint}" else: url = f"{WHO_GHO_BASE_URL}{self.endpoint}" if params: # Handle OData $filter syntax for Indicator endpoint if (self.filter_by_code and "indicator_code" in params and not use_direct_indicator_endpoint): code = params["indicator_code"] filter_parts.append(f"IndicatorCode eq '{code}'") # Handle search term filtering for Indicator endpoint if "search_term" in params and params["search_term"]: term = params["search_term"] filter_parts.append(f"contains(IndicatorName, '{term}')") # For direct indicator endpoints, don't filter by IndicatorCode # (it's already in the URL path), but do filter by dimensions if use_direct_indicator_endpoint: # Handle country filtering if "country_code" in params and params.get("country_code"): code = params["country_code"] filter_parts.append(f"SpatialDim eq '{code}'") # Handle year filtering - don't quote numbers in OData if "year" in params and params.get("year") is not None: year = params["year"] filter_parts.append(f"TimeDim eq {year}") elif self.endpoint == "/Data": # Legacy /Data endpoint logic (kept for compatibility) if "indicator_code" in params: code = params["indicator_code"] filter_parts.append(f"IndicatorCode eq '{code}'") if ("country_code" in params and params.get("country_code")): code = params["country_code"] filter_parts.append(f"SpatialDim eq '{code}'") if (params.get("year") is not None): year = params["year"] filter_parts.append(f"TimeDim eq {year}") # Handle dimension code filtering for Dimension endpoint if ( self.endpoint == "/Dimension" and "dimension_code" in params and params.get("dimension_code") ): code = params["dimension_code"] filter_parts.append(f"Code eq '{code}'") # Combine filter parts if filter_parts: odata_params["$filter"] = " and ".join(filter_parts) # Handle pagination if "top" in params: odata_params["$top"] = min(params["top"], 1000) # Cap at 1000 if "skip" in params: odata_params["$skip"] = params["skip"] try: resp = requests.get(url, params=odata_params, timeout=30) resp.raise_for_status() data = resp.json() return { "data": data, "metadata": { "source": "WHO Global Health Observatory", "endpoint": url, "query": odata_params, }, } except requests.exceptions.RequestException as e: return {"error": f"Request failed: {str(e)}"} except ValueError as e: return {"error": f"Failed to parse JSON: {str(e)}"}
[docs] def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Execute the tool with given arguments.""" return self._make_request(arguments)
[docs] @staticmethod def parse_query(query: str) -> Dict[str, Any]: """ Parse natural language query to extract health topic, country, year. Parameters ---------- query : str Natural language query (e.g., "smoking rate in USA 2020") Returns ------- dict Dictionary with extracted: topic, country_code, year """ query_lower = query.lower() result = {"topic": None, "country_code": None, "year": None} # Extract year (4-digit number) year_match = re.search(r'\b(19|20)\d{2}\b', query) if year_match: result["year"] = int(year_match.group()) # Extract country for country_name, iso_code in COUNTRY_MAPPINGS.items(): if country_name in query_lower: result["country_code"] = iso_code break # Extract health topic (remove country and year, keep rest) topic_query = query_lower if result["year"]: topic_query = re.sub(r'\b(19|20)\d{2}\b', '', topic_query) if result["country_code"]: for country_name in COUNTRY_MAPPINGS.keys(): topic_query = topic_query.replace(country_name, '') # Remove common words common_words_pattern = ( r'\b(in|for|of|the|a|an|rate|prevalence|percentage)\b' ) topic_query = re.sub(common_words_pattern, '', topic_query) topic_query = re.sub(r'\s+', ' ', topic_query).strip() result["topic"] = topic_query if topic_query else query_lower return result
[docs] @staticmethod def rank_indicators( indicators: List[Dict[str, Any]], query: str ) -> List[Dict[str, Any]]: """ Rank indicators by relevance to query. Parameters ---------- indicators : list List of indicator dictionaries query : str Search query Returns ------- list Ranked list of indicators """ query_lower = query.lower() query_words = set(re.findall(r'\b\w+\b', query_lower)) def score_indicator(indicator: Dict[str, Any]) -> float: name = indicator.get('IndicatorName', '').lower() code = indicator.get('IndicatorCode', '').lower() score = 0.0 name_words = set(re.findall(r'\b\w+\b', name)) # Exact phrase match if query_lower in name: score += 10.0 # Word overlap common_words = query_words.intersection(name_words) score += len(common_words) * 2.0 # Code relevance (if query matches code pattern) if any(word in code for word in query_words): score += 1.0 return score ranked = sorted(indicators, key=score_indicator, reverse=True) return ranked
[docs] @staticmethod def format_health_answer( value: Any, indicator_name: str, country_code: Optional[str] = None, year: Optional[int] = None ) -> Dict[str, Any]: """ Format health data into a human-readable answer. Parameters ---------- value : Any Health statistic value indicator_name : str Name of the indicator country_code : str, optional Country code year : int, optional Year Returns ------- dict Formatted answer dictionary """ answer_parts = [] if value is not None: if isinstance(value, (int, float)): name_lower = indicator_name.lower() is_percentage = ( "%" in name_lower or "prevalence" in name_lower ) if is_percentage: answer_parts.append(f"{value}%") else: answer_parts.append(str(value)) else: answer_parts.append(str(value)) else: answer_parts.append("No data available") context = { "indicator": indicator_name, } if country_code: context["country"] = country_code if year: context["year"] = str(year) answer_text = ( " ".join(answer_parts) if answer_parts else "No data available" ) return { "answer": answer_text, "value": value, **context, "source": "WHO Global Health Observatory" }
[docs] @staticmethod def is_value_available(value_obj: Dict[str, Any]) -> bool: """ Check if a WHO data value is available. Returns False when the API returns placeholders such as "Data not available" or when the numeric value is missing. """ if not value_obj: return False numeric = value_obj.get("NumericValue") text_value = value_obj.get("Value") if text_value == "Data not available": return False if numeric is None: return False return True
[docs] def _make_request_for_data(self, params: Dict[str, Any]) -> Dict[str, Any]: """Make request to data using direct indicator endpoint format.""" if "indicator_code" not in params: return {"error": "indicator_code parameter is required"} # Use direct indicator endpoint: /api/{IndicatorCode} indicator_code = params["indicator_code"] url = f"{WHO_GHO_BASE_URL}/{indicator_code}" odata_params = {} filter_parts = [] # Filter by country (SpatialDim) if "country_code" in params and params.get("country_code"): filter_parts.append(f"SpatialDim eq '{params['country_code']}'") # Filter by year (TimeDim) - don't quote numbers in OData if "year" in params and params.get("year"): year_val = params["year"] filter_parts.append(f"TimeDim eq {year_val}") if filter_parts: odata_params["$filter"] = " and ".join(filter_parts) if "top" in params: odata_params["$top"] = params["top"] try: resp = requests.get(url, params=odata_params, timeout=30) resp.raise_for_status() data = resp.json() return {"data": data} except requests.exceptions.RequestException as e: return {"error": f"Request failed: {str(e)}"} except ValueError as e: return {"error": f"Failed to parse JSON: {str(e)}"}
[docs] @register_tool("WHOGHOQueryTool") class WHOGHOQueryTool(WHOGHORESTTool): """Tool for answering generic health questions using natural language."""
[docs] def __init__(self, tool_config): super().__init__(tool_config) # Override endpoint for query tool self.endpoint = "/Indicator"
[docs] def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Execute query tool with natural language processing.""" query = arguments.get("query", "") if not query: return {"error": "Query parameter is required"} # Parse query to extract topic, country, year parsed = self.parse_query(query) # Use provided country/year or extracted ones country_code = ( arguments.get("country_code") or parsed.get("country_code") ) year = arguments.get("year") or parsed.get("year") topic = parsed.get("topic") or query top = arguments.get("top", 5) # Step 1: Search for relevant indicators search_result = self._make_request({ "search_term": topic, "top": min(top * 3, 50) # Get more candidates for ranking }) if "error" in search_result: return search_result indicators_data = search_result.get("data", {}) indicators = indicators_data.get("value", []) if not indicators: return { "error": f"No indicators found for query: '{query}'", "suggestion": "Try different keywords or check spelling" } # Step 2: Rank indicators by relevance ranked_indicators = self.rank_indicators(indicators, topic) # Step 3: Try to get data for top indicators results = [] for indicator in ranked_indicators[:top]: indicator_code = indicator.get("IndicatorCode") indicator_name = indicator.get("IndicatorName") # Get data for this indicator data_params = {"indicator_code": indicator_code, "top": 1} if country_code: data_params["country_code"] = country_code if year: data_params["year"] = year data_result = self._make_request_for_data(data_params) if "error" not in data_result: data_obj = data_result.get("data", {}) values = data_obj.get("value", []) values = [ v for v in values if self.is_value_available(v) ] if values: value_obj = values[0] value = ( value_obj.get("NumericValue") or value_obj.get("Value") ) result_year = ( value_obj.get("TimeDim") or (str(year) if year else None) ) result_country = ( value_obj.get("SpatialDim") or country_code ) # Convert year to int if possible year_int = None if result_year: if isinstance(result_year, int): year_int = result_year elif isinstance(result_year, str) and result_year.isdigit(): year_int = int(result_year) elif isinstance(result_year, str): # Try to extract year from string try: year_int = int(result_year) except (ValueError, TypeError): year_int = None formatted = self.format_health_answer( value=value, indicator_name=indicator_name, country_code=result_country, year=year_int ) formatted["indicator_code"] = indicator_code results.append(formatted) if not results: return { "error": f"No data found for query: '{query}'", "matched_indicators": [ { "code": ind.get("IndicatorCode"), "name": ind.get("IndicatorName") } for ind in ranked_indicators[:3] ], "suggestion": ( "Try a different country, year, or check if data is " "available" ) } # Return best match (first result) return { "data": results[0] if len(results) == 1 else results, "metadata": { "source": "WHO Global Health Observatory", "query": query, "indicators_searched": len(ranked_indicators), "results_found": len(results) } }
[docs] @register_tool("WHOGHOTopicTool") class WHOGHOTopicTool(WHOGHORESTTool): """Tool for finding indicators by topic."""
[docs] def __init__(self, tool_config): super().__init__(tool_config) self.endpoint = "/Indicator"
[docs] def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Find indicators by topic.""" topic = arguments.get("topic", "") if not topic: return {"error": "Topic parameter is required"} top = arguments.get("top", 10) # Search for indicators search_result = self._make_request({ "search_term": topic, "top": min(top * 2, 100) # Get more for ranking }) if "error" in search_result: return search_result indicators_data = search_result.get("data", {}) indicators = indicators_data.get("value", []) if not indicators: return { "error": f"No indicators found for topic: '{topic}'", "suggestion": "Try different keywords or check spelling" } # Rank indicators by relevance ranked_indicators = self.rank_indicators(indicators, topic) # Calculate relevance scores (simplified) result_indicators = [] for idx, indicator in enumerate(ranked_indicators[:top]): # Simple relevance score based on position score = ( (len(ranked_indicators) - idx) / len(ranked_indicators) * 10 ) result_indicators.append({ "IndicatorCode": indicator.get("IndicatorCode"), "IndicatorName": indicator.get("IndicatorName"), "relevance_score": round(score, 2) }) return { "data": { "indicators": result_indicators, "topic": topic, "total_found": len(indicators) }, "metadata": { "source": "WHO Global Health Observatory", "topic": topic } }
[docs] @register_tool("WHOGHOStatisticTool") class WHOGHOStatisticTool(WHOGHORESTTool): """Tool for getting health statistics by indicator name."""
[docs] def __init__(self, tool_config): super().__init__(tool_config) self.endpoint = "/Indicator"
[docs] def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Get health statistic by indicator name.""" indicator_name = arguments.get("indicator_name", "") country_code = arguments.get("country_code", "") year = arguments.get("year") if not indicator_name: return {"error": "indicator_name parameter is required"} if not country_code: return {"error": "country_code parameter is required"} # Step 1: Search for matching indicator search_result = self._make_request({ "search_term": indicator_name, "top": 20 }) if "error" in search_result: return search_result indicators_data = search_result.get("data", {}) indicators = indicators_data.get("value", []) if not indicators: return { "error": f"No indicators found matching: '{indicator_name}'", "suggestion": "Try different keywords or check spelling" } # Step 2: Rank and get best match ranked_indicators = self.rank_indicators(indicators, indicator_name) best_indicator = ranked_indicators[0] indicator_code = best_indicator.get("IndicatorCode") full_indicator_name = best_indicator.get("IndicatorName") # Step 3: Get data data_params = { "indicator_code": indicator_code, "country_code": country_code, "top": 1 } if year: data_params["year"] = year data_result = self._make_request_for_data(data_params) if "error" in data_result: return { "error": data_result["error"], "indicator_found": { "code": indicator_code, "name": full_indicator_name }, "suggestion": ( "Data may not be available for this country/year " "combination" ) } data_obj = data_result.get("data", {}) values = data_obj.get("value", []) values = [v for v in values if self.is_value_available(v)] if not values: # Try without year filter to get most recent if year: data_params_no_year = { "indicator_code": indicator_code, "country_code": country_code, "top": 10 } data_result_no_year = self._make_request_for_data( data_params_no_year ) if "error" not in data_result_no_year: data_obj_no_year = data_result_no_year.get("data", {}) values = data_obj_no_year.get("value", []) values = [ v for v in values if self.is_value_available(v) ] if values: # Get most recent year values.sort( key=lambda x: x.get("TimeDim", ""), reverse=True ) values = [values[0]] if not values: return { "error": ( f"No data available for '{indicator_name}' in " f"{country_code}" ), "indicator_found": { "code": indicator_code, "name": full_indicator_name }, "suggestion": ( "Try a different country or check data availability" ) } value_obj = values[0] value = ( value_obj.get("NumericValue") or value_obj.get("Value") ) result_year = ( value_obj.get("TimeDim") or (str(year) if year else None) ) result_country = value_obj.get("SpatialDim") or country_code # Convert year to int if possible year_int = None if result_year: if isinstance(result_year, int): year_int = result_year elif isinstance(result_year, str) and result_year.isdigit(): year_int = int(result_year) elif isinstance(result_year, str): # Try to extract year from string try: year_int = int(result_year) except (ValueError, TypeError): year_int = None formatted = self.format_health_answer( value=value, indicator_name=full_indicator_name, country_code=result_country, year=year_int ) return { "data": formatted, "metadata": { "source": "WHO Global Health Observatory", "indicator_code": indicator_code } }