tooluniverse.nhanes_tool 源代码

"""
NHANES Tool

Provides information about NHANES (National Health and Nutrition Examination Survey) datasets.
Supports dataset discovery, search, and direct XPT download+parse for analysis.
"""

import io
import math
import re
from typing import Dict, Any, Optional

import pandas as pd
import requests

from .base_tool import BaseTool
from .tool_registry import register_tool


[文档] @register_tool("NHANESTool") class NHANESTool(BaseTool): """NHANES data information tool."""
[文档] def __init__(self, tool_config): super().__init__(tool_config) self.endpoint = tool_config["fields"]["endpoint"]
[文档] def _get_dataset_info(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Get NHANES dataset information.""" year = arguments.get("year") component = arguments.get("component") base_url = "https://wwwn.cdc.gov/Nchs/Nhanes" # Common NHANES cycles cycles = [ "2017-2018", "2015-2016", "2013-2014", "2011-2012", "2009-2010", "2007-2008", ] datasets = [] if year: cycles_to_show = [year] if year in cycles else cycles[:2] else: cycles_to_show = cycles[:2] # Show most recent for cycle in cycles_to_show: if component: datasets.append( { "name": f"NHANES {component} - {cycle}", "year": cycle, "component": component, "download_url": f"{base_url}/{cycle}/{component.lower()}_{cycle}.aspx", "description": f"NHANES {component} data for {cycle}", } ) else: # Show all components for comp in [ "Demographics", "Dietary", "Examination", "Laboratory", "Questionnaire", ]: datasets.append( { "name": f"NHANES {comp} - {cycle}", "year": cycle, "component": comp, "download_url": f"{base_url}/{cycle}/{comp.lower()}_{cycle}.aspx", "description": f"NHANES {comp} data for {cycle}", } ) return { "status": "success", "data": { "datasets": datasets[:20], # Limit results "note": "NHANES data is available as downloadable files (SAS, XPT formats) from the CDC website. Visit the download URLs to access datasets. Files may require SAS or conversion tools to read.", }, "metadata": { "source": "CDC NHANES", "endpoint": self.endpoint, "query": arguments, }, }
[文档] def _search_datasets(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Search NHANES variable list dynamically via CDC website. Queries the actual NHANES variable catalog at wwwn.cdc.gov instead of using hardcoded keyword lists. """ search_term = arguments.get("search_term", "").lower() year = arguments.get("year") limit = arguments.get("limit", 20) cycle_year = year.split("-")[0] if year else "2017" components = [ "Demographics", "Dietary", "Examination", "Laboratory", "Questionnaire", ] datasets = [] seen_files: set = set() for component in components: url = ( "https://wwwn.cdc.gov/Nchs/Nhanes/search/variablelist.aspx" f"?Component={component}&CycleBeginYear={cycle_year}" ) try: resp = requests.get(url, timeout=15) if resp.status_code != 200: continue html = resp.text # Each row has 8 <td> cells: VarName, VarDesc, FileName, # FileDesc, CycleBegin, CycleEnd, Component, Constraints rows = re.findall( r"<td>([^<]+)</td>\s*<td>([^<]+)</td>" r"<td>([^<]+)</td>\s*<td>([^<]+)</td>" r"<td>[^<]*</td>\s*<td>[^<]*</td>" r"<td>[^<]*</td>\s*<td>[^<]*</td>", html, ) for var_name, var_desc, file_name, file_desc in rows: if search_term and not any( search_term in s.lower() for s in [var_desc, var_name, file_desc] ): continue if file_name not in seen_files: seen_files.add(file_name) end_year = str(int(cycle_year) + 1) datasets.append( { "file_name": file_name, "file_description": file_desc, "component": component, "matching_variable": var_name, "variable_description": var_desc, "cycle": year or f"{cycle_year}-{end_year}", "download_url": ( f"https://wwwn.cdc.gov/Nchs/Nhanes/" f"{cycle_year}-{end_year}/" f"DataFiles/{file_name}.XPT" ), } ) if len(datasets) >= limit: break except Exception: continue if len(datasets) >= limit: break return { "status": "success", "data": { "datasets": datasets, "count": len(datasets), "search_term": search_term, "cycle": year or f"{cycle_year}-{str(int(cycle_year) + 1)}", }, "metadata": { "source": "NHANES Variable List (wwwn.cdc.gov)", "components_searched": components, }, }
# Cycle suffix mapping: cycle -> letter suffix for NHANES filenames _CYCLE_SUFFIX = { "2011-2012": "_G", "2013-2014": "_H", "2015-2016": "_I", "2017-2018": "_J", "2019-2020": "_K", } # Component -> default filename prefix (without suffix) _COMPONENT_PREFIX = { "Demographics": "DEMO", "Dietary": "DR1TOT", "DietaryDay2": "DR2TOT", "Examination": "BPX", # Blood pressure as default exam "BodyMeasures": "BMX", "Questionnaire": "PFQ", # Physical functioning as default }
[文档] def _resolve_filename( self, component: str, cycle: str, dataset_name: Optional[str] = None ) -> str: """Resolve component + cycle to the XPT filename (without .XPT).""" suffix = self._CYCLE_SUFFIX.get(cycle, "") if not suffix: # Try to derive suffix from cycle year start_year = int(cycle.split("-")[0]) # 2011=G(7th), each +2 years = +1 letter idx = (start_year - 2011) // 2 if 0 <= idx < 26: suffix = f"_{chr(ord('G') + idx)}" else: return "" if dataset_name: return f"{dataset_name}{suffix}" prefix = self._COMPONENT_PREFIX.get(component) if not prefix: return "" return f"{prefix}{suffix}"
[文档] def _build_xpt_url(self, cycle: str, filename: str) -> str: """Build the CDC download URL for an XPT file.""" start_year = cycle.split("-")[0] return ( f"https://wwwn.cdc.gov/Nchs/Data/Nhanes/Public/" f"{start_year}/DataFiles/{filename}.XPT" )
[文档] def _download_xpt(self, url: str) -> pd.DataFrame: """Download and parse an XPT file from CDC. Returns a DataFrame.""" resp = requests.get(url, timeout=120) if resp.status_code != 200: raise ValueError(f"HTTP {resp.status_code} downloading {url}") # CDC returns XPT content (possibly gzip-transported, requests handles that) content = resp.content if len(content) < 100: raise ValueError(f"Empty or invalid response from {url}") # Check for HTML error page (CDC returns 200 with HTML for missing files) if content[:5] == b"<!DOC" or content[:5] == b"<html": raise ValueError(f"File not found at {url} (CDC returned HTML error page)") return pd.read_sas(io.BytesIO(content), format="xport")
[文档] @staticmethod def _format_age_bounds(age_min, age_max) -> str: """Format age bounds into a human-readable string like '>= 60 and <= 80'.""" parts = [] if age_min is not None: parts.append(f">={age_min}") if age_max is not None: parts.append(f"<={age_max}") return " and ".join(parts)
[文档] @staticmethod def _filter_by_age(df: pd.DataFrame, age_min, age_max) -> pd.DataFrame: """Filter DataFrame by RIDAGEYR bounds.""" if age_min is not None: df = df[df["RIDAGEYR"] >= age_min] if age_max is not None: df = df[df["RIDAGEYR"] <= age_max] return df
[文档] def _compute_summary_stats(self, df: pd.DataFrame) -> Dict[str, Any]: """Compute summary statistics for numeric columns.""" stats = {} for col in df.select_dtypes(include=["number"]).columns: series = df[col].dropna() n = len(series) if n == 0: stats[col] = { "count": 0, "mean": None, "std": None, "min": None, "max": None, } continue stats[col] = { "count": n, "mean": round(float(series.mean()), 4), "std": round(float(series.std()), 4), "min": round(float(series.min()), 4), "max": round(float(series.max()), 4), } return stats
[文档] def _download_and_parse(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Download an NHANES XPT file, parse it, and return structured data.""" component = arguments.get("component", "") cycle = arguments.get("cycle", "") dataset_name = arguments.get("dataset_name") variables = arguments.get("variables") age_min = arguments.get("age_min") age_max = arguments.get("age_max") max_rows = arguments.get("max_rows", 5000) if not component or not cycle: return { "status": "error", "error": "Both 'component' and 'cycle' are required.", } if component == "Laboratory" and not dataset_name: return { "status": "error", "error": ( "Laboratory component requires 'dataset_name' " "(e.g., 'CBC', 'BIOPRO', 'GHB', 'GLU', 'TRIGLY', 'HDL', 'TCHOL'). " "Use nhanes_search_datasets to discover available dataset names." ), } filename = self._resolve_filename(component, cycle, dataset_name) if not filename: return { "status": "error", "error": ( f"Cannot resolve filename for component='{component}', " f"cycle='{cycle}'. Supported cycles: " f"{', '.join(sorted(self._CYCLE_SUFFIX.keys()))}" ), } url = self._build_xpt_url(cycle, filename) try: df = self._download_xpt(url) except ValueError as exc: return {"status": "error", "error": str(exc)} except Exception as exc: return { "status": "error", "error": f"Failed to download/parse {url}: {exc}", } # Age filtering: merge with DEMO if needed age_filter_desc = None warnings = [] if age_min is not None or age_max is not None: bounds = self._format_age_bounds(age_min, age_max) if component == "Demographics": if "RIDAGEYR" in df.columns: df = self._filter_by_age(df, age_min, age_max) age_filter_desc = f"RIDAGEYR {bounds}" else: warnings.append("RIDAGEYR not found in Demographics") elif "SEQN" in df.columns: demo_filename = self._resolve_filename("Demographics", cycle) if demo_filename: demo_url = self._build_xpt_url(cycle, demo_filename) try: demo_df = self._download_xpt(demo_url) demo_subset = self._filter_by_age( demo_df[["SEQN", "RIDAGEYR"]], age_min, age_max ) valid_seqns = set(demo_subset["SEQN"].dropna()) df = df[df["SEQN"].isin(valid_seqns)] age_filter_desc = ( f"RIDAGEYR {bounds} (merged with {demo_filename})" ) except Exception as exc: warnings.append( f"Age filter failed (could not load DEMO): {exc}" ) # Variable selection if variables: cols_to_keep = list(dict.fromkeys(["SEQN"] + variables)) available = [c for c in cols_to_keep if c in df.columns] missing = [c for c in cols_to_keep if c not in df.columns] df = df[available] if missing: warnings.append(f"Missing variables: {missing}") total_rows = len(df) summary = self._compute_summary_stats(df) # Convert to JSON-safe records (replace NaN/inf with None) records = df.head(max_rows).to_dict(orient="records") for row in records: for key, val in row.items(): if isinstance(val, float) and (math.isnan(val) or math.isinf(val)): row[key] = None metadata: Dict[str, Any] = { "source": "CDC NHANES", "download_url": url, "cycle": cycle, "component": component, "dataset_name": filename, } if age_filter_desc: metadata["age_filter"] = age_filter_desc if variables: metadata["variables_requested"] = variables if warnings: metadata["warnings"] = warnings return { "status": "success", "data": { "columns": list(df.columns), "dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()}, "total_rows": total_rows, "returned_rows": len(records), "records": records, "summary_statistics": summary, }, "metadata": metadata, }
[文档] def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Execute the NHANES tool.""" if self.endpoint == "dataset_info": return self._get_dataset_info(arguments) elif self.endpoint == "search": return self._search_datasets(arguments) elif self.endpoint == "download_and_parse": return self._download_and_parse(arguments) else: return {"status": "error", "error": f"Unknown endpoint: {self.endpoint}"}