Source code for tooluniverse.metaboanalyst_tool

"""
MetaboAnalyst-style metabolomics analysis tool for ToolUniverse.

Provides metabolite name mapping, pathway enrichment analysis,
pathway library browsing, and metabolite set (biomarker) enrichment.

Uses the KEGG REST API (https://rest.kegg.jp/) for compound resolution
and pathway-metabolite mappings, and performs statistical enrichment
(hypergeometric / Fisher's exact test) locally via scipy.

No authentication required.
"""

import re
import requests
from math import log2
from typing import Any, Dict, List, Optional, Tuple

from .base_tool import BaseTool
from .tool_registry import register_tool

try:
    from scipy.stats import hypergeom

    HAS_SCIPY = True
except ImportError:
    HAS_SCIPY = False

KEGG_BASE = "https://rest.kegg.jp"

# ------------------------------------------------------------------
# Curated metabolite-set libraries for biomarker enrichment
# Each set: name -> list of common metabolite names
# Derived from SMPDB/HMDB pathway categories
# ------------------------------------------------------------------
BIOMARKER_SETS: Dict[str, List[str]] = {
    "Glycolysis and Gluconeogenesis": [
        "glucose",
        "pyruvate",
        "lactate",
        "fructose 1,6-bisphosphate",
        "glyceraldehyde 3-phosphate",
        "dihydroxyacetone phosphate",
        "phosphoenolpyruvate",
        "2-phosphoglycerate",
        "3-phosphoglycerate",
        "glucose 6-phosphate",
        "fructose 6-phosphate",
    ],
    "TCA Cycle": [
        "citrate",
        "isocitrate",
        "alpha-ketoglutarate",
        "succinate",
        "fumarate",
        "malate",
        "oxaloacetate",
        "pyruvate",
        "acetyl-CoA",
    ],
    "Urea Cycle": [
        "ornithine",
        "citrulline",
        "argininosuccinate",
        "arginine",
        "urea",
        "fumarate",
        "aspartate",
    ],
    "Amino Acid Metabolism": [
        "alanine",
        "glycine",
        "serine",
        "threonine",
        "valine",
        "leucine",
        "isoleucine",
        "proline",
        "phenylalanine",
        "tyrosine",
        "tryptophan",
        "aspartate",
        "glutamate",
        "lysine",
        "histidine",
        "methionine",
        "cysteine",
        "arginine",
        "asparagine",
        "glutamine",
    ],
    "Fatty Acid Biosynthesis": [
        "palmitic acid",
        "stearic acid",
        "oleic acid",
        "myristic acid",
        "lauric acid",
        "malonyl-CoA",
        "acetyl-CoA",
    ],
    "Fatty Acid Beta-Oxidation": [
        "palmitic acid",
        "palmitoylcarnitine",
        "acetyl-CoA",
        "octanoylcarnitine",
        "hexanoylcarnitine",
        "butyrylcarnitine",
        "acetylcarnitine",
        "carnitine",
    ],
    "Purine Metabolism": [
        "adenine",
        "guanine",
        "hypoxanthine",
        "xanthine",
        "uric acid",
        "inosine",
        "adenosine",
        "guanosine",
        "AMP",
        "GMP",
        "IMP",
    ],
    "Pyrimidine Metabolism": [
        "uracil",
        "thymine",
        "cytosine",
        "uridine",
        "thymidine",
        "cytidine",
        "UMP",
        "CMP",
        "orotate",
    ],
    "Glutathione Metabolism": [
        "glutathione",
        "glutamate",
        "cysteine",
        "glycine",
        "gamma-glutamylcysteine",
        "pyroglutamic acid",
        "5-oxoproline",
    ],
    "Tryptophan Metabolism": [
        "tryptophan",
        "kynurenine",
        "serotonin",
        "melatonin",
        "3-hydroxykynurenine",
        "quinolinic acid",
        "nicotinamide",
        "indole",
        "indole-3-acetic acid",
    ],
    "Bile Acid Biosynthesis": [
        "cholesterol",
        "cholic acid",
        "chenodeoxycholic acid",
        "deoxycholic acid",
        "lithocholic acid",
        "glycocholic acid",
        "taurocholic acid",
        "ursodeoxycholic acid",
    ],
    "Ketone Body Metabolism": [
        "acetoacetate",
        "3-hydroxybutyrate",
        "acetone",
        "acetyl-CoA",
    ],
    "Pentose Phosphate Pathway": [
        "glucose 6-phosphate",
        "6-phosphogluconate",
        "ribulose 5-phosphate",
        "ribose 5-phosphate",
        "xylulose 5-phosphate",
        "sedoheptulose 7-phosphate",
        "erythrose 4-phosphate",
        "fructose 6-phosphate",
        "glyceraldehyde 3-phosphate",
    ],
    "Sphingolipid Metabolism": [
        "sphingosine",
        "sphinganine",
        "ceramide",
        "sphingomyelin",
        "sphingosine 1-phosphate",
        "palmitoyl-CoA",
        "serine",
    ],
    "Arachidonic Acid Metabolism": [
        "arachidonic acid",
        "prostaglandin E2",
        "prostaglandin F2alpha",
        "thromboxane A2",
        "leukotriene B4",
        "leukotriene C4",
        "5-HETE",
        "12-HETE",
        "15-HETE",
    ],
    "Branched-Chain Amino Acid Degradation": [
        "valine",
        "leucine",
        "isoleucine",
        "alpha-ketoisovalerate",
        "alpha-ketoisocaproate",
        "alpha-keto-beta-methylvalerate",
        "isobutyryl-CoA",
        "isovaleryl-CoA",
        "methylbutyryl-CoA",
    ],
    "Histidine Metabolism": [
        "histidine",
        "histamine",
        "imidazole acetaldehyde",
        "urocanate",
        "glutamate",
        "methylhistidine",
    ],
    "Methionine and Cysteine Metabolism": [
        "methionine",
        "S-adenosylmethionine",
        "S-adenosylhomocysteine",
        "homocysteine",
        "cysteine",
        "cystathionine",
        "taurine",
    ],
    "Nicotinate and Nicotinamide Metabolism": [
        "nicotinamide",
        "nicotinate",
        "NAD+",
        "NADP+",
        "nicotinamide mononucleotide",
        "nicotinic acid mononucleotide",
        "quinolinic acid",
    ],
    "Pantothenate and CoA Biosynthesis": [
        "pantothenate",
        "pantetheine",
        "coenzyme A",
        "4-phosphopantothenate",
        "beta-alanine",
        "cysteine",
    ],
}


def _normalize(name: str) -> str:
    """Lowercase, strip whitespace, remove trailing acid/ate variants for matching."""
    return re.sub(r"\s+", " ", name.strip().lower())


[docs] @register_tool("MetaboAnalystTool") class MetaboAnalystTool(BaseTool): """ Metabolomics analysis tool providing metabolite name mapping, pathway enrichment, pathway library listing, and biomarker set enrichment. Uses KEGG REST API + local scipy statistics. """
[docs] def __init__(self, tool_config: Dict[str, Any]): super().__init__(tool_config) self.timeout = tool_config.get("timeout", 30) fields = tool_config.get("fields", {}) self.endpoint = fields.get("endpoint", "pathway_enrichment")
# ---------------------------------------------------------- # dispatch # ----------------------------------------------------------
[docs] def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]: try: return self._dispatch(arguments) except requests.exceptions.Timeout: return {"status": "error", "error": "KEGG API timed out"} except requests.exceptions.ConnectionError: return {"status": "error", "error": "Cannot connect to KEGG API"} except Exception as e: return {"status": "error", "error": str(e)}
[docs] def _dispatch(self, arguments: Dict[str, Any]) -> Dict[str, Any]: if self.endpoint == "pathway_enrichment": return self._pathway_enrichment(arguments) elif self.endpoint == "name_to_id": return self._name_to_id(arguments) elif self.endpoint == "get_pathway_library": return self._get_pathway_library(arguments) elif self.endpoint == "biomarker_enrichment": return self._biomarker_enrichment(arguments) return {"status": "error", "error": f"Unknown endpoint: {self.endpoint}"}
# ---------------------------------------------------------- # helpers – KEGG API wrappers # ----------------------------------------------------------
[docs] def _kegg_find_compound(self, name: str) -> Optional[Tuple[str, str]]: """Resolve a metabolite name to (KEGG compound ID, matched name). Returns the first exact-ish hit or None.""" url = f"{KEGG_BASE}/find/compound/{requests.utils.quote(name)}" resp = requests.get(url, timeout=self.timeout) if resp.status_code != 200 or not resp.text.strip(): return None norm = _normalize(name) # Try exact match first for line in resp.text.strip().split("\n"): parts = line.split("\t", 1) if len(parts) < 2: continue cid = parts[0].replace("cpd:", "") synonyms = [_normalize(s) for s in parts[1].split(";")] for syn in synonyms: if syn == norm: return (cid, parts[1].split(";")[0].strip()) # Feature-71A-003: No blind fallback to first result — only return exact match or None return None
[docs] def _kegg_get_compound_info(self, compound_id: str) -> Dict[str, Any]: """Get compound detail from KEGG (name, formula, exact mass, etc.).""" url = f"{KEGG_BASE}/get/{compound_id}" resp = requests.get(url, timeout=self.timeout) if resp.status_code != 200: return {} info: Dict[str, Any] = {"kegg_id": compound_id} for line in resp.text.split("\n"): if line.startswith("NAME"): names_part = line[12:].strip() info["name"] = names_part.rstrip(";").split(";")[0].strip() elif line.startswith("FORMULA"): info["formula"] = line[12:].strip() elif line.startswith("EXACT_MASS"): try: info["exact_mass"] = float(line[12:].strip()) except ValueError: pass elif line.startswith("MOL_WEIGHT"): try: info["mol_weight"] = float(line[12:].strip()) except ValueError: pass elif line.startswith("DBLINKS"): info["dblinks"] = self._parse_dblinks(line, resp.text) return info
[docs] def _kegg_list_pathways(self, organism: str) -> List[Dict[str, str]]: """List all KEGG pathways for an organism.""" url = f"{KEGG_BASE}/list/pathway/{organism}" resp = requests.get(url, timeout=self.timeout) if resp.status_code != 200: return [] pathways = [] for line in resp.text.strip().split("\n"): if not line.strip(): continue parts = line.split("\t", 1) if len(parts) == 2: pid = parts[0].strip() pname = parts[1].strip() # Remove organism suffix like " - Homo sapiens (human)" pname = re.sub(r"\s+-\s+[A-Z][a-z].*$", "", pname) pathways.append({"pathway_id": pid, "pathway_name": pname}) return pathways
[docs] def _kegg_get_pathway_compounds(self, pathway_map_id: str) -> List[str]: """Get KEGG compound IDs in a pathway (using map prefix).""" url = f"{KEGG_BASE}/link/cpd/{pathway_map_id}" resp = requests.get(url, timeout=self.timeout) if resp.status_code != 200 or not resp.text.strip(): return [] compounds = [] for line in resp.text.strip().split("\n"): parts = line.split("\t") if len(parts) >= 2: cid = parts[1].replace("cpd:", "") compounds.append(cid) return compounds
# ---------------------------------------------------------- # 1. Pathway Enrichment # ----------------------------------------------------------
[docs] def _pathway_enrichment(self, arguments: Dict[str, Any]) -> Dict[str, Any]: if not HAS_SCIPY: return { "status": "error", "error": "scipy is required for enrichment analysis. Install: pip install scipy", } metabolites = arguments.get("metabolites", []) if not metabolites or not isinstance(metabolites, list): return { "status": "error", "error": "metabolites must be a non-empty list of metabolite names", } organism = arguments.get("organism", "hsa") # Step 1: resolve metabolite names to KEGG compound IDs resolved = {} unresolved = [] for name in metabolites: result = self._kegg_find_compound(name) if result: resolved[name] = result[0] # compound ID else: unresolved.append(name) if not resolved: return { "status": "error", "error": f"Could not resolve any metabolite names to KEGG IDs. Tried: {metabolites}", } query_cids = set(resolved.values()) # Step 2: get metabolic pathway list (filter to metabolic pathways) all_pathways = self._kegg_list_pathways(organism) # Focus on metabolic pathways (IDs 00xxx and 01xxx) metabolic_pathways = [] for pw in all_pathways: pid = pw["pathway_id"] num_part = re.sub(r"^[a-z]+", "", pid) if num_part.isdigit(): num = int(num_part) # Metabolic pathways: 00010-01999 if num < 2000: metabolic_pathways.append(pw) if not metabolic_pathways: return { "status": "error", "error": f"No metabolic pathways found for organism '{organism}'", } # Step 3: build pathway-compound map (use map prefix for compound links) pathway_compounds: Dict[str, List[str]] = {} for pw in metabolic_pathways: pid = pw["pathway_id"] map_id = "map" + re.sub(r"^[a-z]+", "", pid) compounds = self._kegg_get_pathway_compounds(map_id) if compounds: pathway_compounds[pid] = compounds # Step 4: collect background (all unique compounds across all pathways) all_compounds = set() for cids in pathway_compounds.values(): all_compounds.update(cids) N = len(all_compounds) # total background size if N == 0: return { "status": "error", "error": "No compound data found for pathways", } k = len(query_cids) # query set size # Step 5: hypergeometric enrichment results = [] for pw in metabolic_pathways: pid = pw["pathway_id"] if pid not in pathway_compounds: continue pw_cids = set(pathway_compounds[pid]) m = len(pw_cids) # pathway size overlap = query_cids & pw_cids x = len(overlap) # overlap size if x == 0: continue # P(X >= x) using hypergeometric survival function p_value = hypergeom.sf(x - 1, N, m, k) fold_enrichment = (x / k) / (m / N) if m > 0 and k > 0 and N > 0 else 0 results.append( { "pathway_id": pid, "pathway_name": pw["pathway_name"], "p_value": round(p_value, 6), "fold_enrichment": round(fold_enrichment, 2), "hit_count": x, "pathway_size": m, "hit_metabolites": sorted(overlap), } ) # Sort by p-value results.sort(key=lambda r: r["p_value"]) # Apply simple Benjamini-Hochberg FDR correction n_tests = len(results) for i, r in enumerate(results): rank = i + 1 fdr = r["p_value"] * n_tests / rank r["fdr"] = round(min(fdr, 1.0), 6) return { "status": "success", "data": results, "metadata": { "organism": organism, "query_count": len(metabolites), "resolved_count": len(resolved), "unresolved": unresolved if unresolved else None, "background_size": N, "pathways_tested": n_tests, "method": "hypergeometric (over-representation analysis)", "source": "KEGG", }, }
# ---------------------------------------------------------- # 2. Name to ID mapping # ----------------------------------------------------------
[docs] def _name_to_id(self, arguments: Dict[str, Any]) -> Dict[str, Any]: metabolites = arguments.get("metabolites", []) if not metabolites or not isinstance(metabolites, list): return { "status": "error", "error": "metabolites must be a non-empty list of metabolite names", } results = [] for name in metabolites: entry: Dict[str, Any] = {"query": name, "match": None} hit = self._kegg_find_compound(name) if hit: cid, matched_name = hit entry["match"] = matched_name entry["kegg_id"] = cid # Get additional DB cross-references info = self._kegg_get_compound_info(cid) dblinks = info.get("dblinks", {}) entry["hmdb_id"] = dblinks.get("HMDB", None) entry["pubchem_sid"] = dblinks.get("PubChem", None) entry["chebi_id"] = dblinks.get("ChEBI", None) entry["formula"] = info.get("formula", None) entry["exact_mass"] = info.get("exact_mass", None) else: entry["kegg_id"] = None entry["hmdb_id"] = None entry["pubchem_sid"] = None entry["chebi_id"] = None entry["formula"] = None entry["exact_mass"] = None results.append(entry) return { "status": "success", "data": results, }
# ---------------------------------------------------------- # 3. Pathway library listing # ----------------------------------------------------------
[docs] def _get_pathway_library(self, arguments: Dict[str, Any]) -> Dict[str, Any]: organism = arguments.get("organism", "hsa") all_pathways = self._kegg_list_pathways(organism) if not all_pathways: return { "status": "error", "error": f"No pathways found for organism '{organism}'. Use KEGG organism code (hsa=human, mmu=mouse, rno=rat, dme=fly, sce=yeast).", } # Enrich with compound counts for metabolic pathways results = [] for pw in all_pathways: pid = pw["pathway_id"] num_part = re.sub(r"^[a-z]+", "", pid) is_metabolic = False if num_part.isdigit(): num = int(num_part) is_metabolic = num < 2000 entry = { "pathway_id": pid, "pathway_name": pw["pathway_name"], "category": "metabolic" if is_metabolic else "non-metabolic", } if is_metabolic: map_id = "map" + num_part compounds = self._kegg_get_pathway_compounds(map_id) entry["compound_count"] = len(compounds) else: entry["compound_count"] = 0 results.append(entry) # Sort: metabolic first, then by compound count results.sort( key=lambda r: ( -1 if r["category"] == "metabolic" else 0, -r["compound_count"], ) ) metabolic_count = sum(1 for r in results if r["category"] == "metabolic") return { "status": "success", "data": results, "metadata": { "organism": organism, "total_pathways": len(results), "metabolic_pathways": metabolic_count, "source": "KEGG", }, }
# ---------------------------------------------------------- # 4. Biomarker / metabolite-set enrichment # ----------------------------------------------------------
[docs] def _biomarker_enrichment(self, arguments: Dict[str, Any]) -> Dict[str, Any]: if not HAS_SCIPY: return { "status": "error", "error": "scipy is required for enrichment analysis. Install: pip install scipy", } metabolites = arguments.get("metabolites", []) if not metabolites or not isinstance(metabolites, list): return { "status": "error", "error": "metabolites must be a non-empty list of metabolite names", } query_set = {_normalize(m) for m in metabolites} # Build background: union of all metabolites across all sets all_metabolites_set = set() for mset in BIOMARKER_SETS.values(): for m in mset: all_metabolites_set.add(_normalize(m)) # Also add query metabolites to background all_metabolites_set.update(query_set) N = len(all_metabolites_set) # background size k = len(query_set) # query size results = [] for set_name, set_members in BIOMARKER_SETS.items(): normalized_members = {_normalize(m) for m in set_members} m = len(normalized_members) # set size overlap = query_set & normalized_members x = len(overlap) if x == 0: continue p_value = hypergeom.sf(x - 1, N, m, k) fold_enrichment = (x / k) / (m / N) if m > 0 and k > 0 and N > 0 else 0 results.append( { "metabolite_set": set_name, "p_value": round(p_value, 6), "fold_enrichment": round(fold_enrichment, 2), "hit_count": x, "set_size": m, "hit_metabolites": sorted(overlap), } ) results.sort(key=lambda r: r["p_value"]) # BH FDR correction n_tests = len(results) for i, r in enumerate(results): rank = i + 1 fdr = r["p_value"] * n_tests / rank r["fdr"] = round(min(fdr, 1.0), 6) return { "status": "success", "data": results, "metadata": { "query_count": len(metabolites), "background_size": N, "sets_tested": len(BIOMARKER_SETS), "sets_hit": len(results), "library": "MetaboAnalyst-style SMPDB/HMDB metabolite sets", "method": "hypergeometric (over-representation analysis)", }, }