Source code for tooluniverse.timer_tool

"""
TIMER (Tumor Immune Estimation Resource) — cBioPortal Backend

TIMER2.0/3.0 (timer.cistrome.org) no longer provides a public REST API;
the server redirects to a Shiny web application (compbio.cn/timer3/).

This tool replicates TIMER functionality using the cBioPortal public REST API
(www.cbioportal.org/api) to query TCGA gene expression and survival data.

Operations:
  - immune_estimation : Proxy immune-cell infiltration from marker gene expression
  - gene_correlation   : Spearman correlation between two genes across TCGA samples
  - survival_association: Overall-survival log-rank test for high/low gene expression
"""

import time
import requests
from typing import Dict, Any, List, Optional, Tuple
from .base_tool import BaseTool
from .tool_registry import register_tool

CBIOPORTAL_BASE = "https://www.cbioportal.org/api"

# Map TCGA abbreviations → cBioPortal study IDs (Firehose Legacy datasets)
CANCER_STUDY_MAP: Dict[str, str] = {
    "BRCA": "brca_tcga",
    "LUAD": "luad_tcga",
    "LUSC": "lusc_tcga",
    "COAD": "coadread_tcga",
    "READ": "coadread_tcga",
    "SKCM": "skcm_tcga",
    "GBM": "gbm_tcga",
    "UCEC": "ucec_tcga",
    "KIRC": "kirc_tcga",
    "PRAD": "prad_tcga",
    "HNSC": "hnsc_tcga",
    "STAD": "stad_tcga",
    "BLCA": "blca_tcga",
    "THCA": "thca_tcga",
    "LIHC": "lihc_tcga",
    "CESC": "cesc_tcga",
    "OV": "ov_tcga",
    "PCPG": "pcpg_tcga",
    "SARC": "sarc_tcga",
    "ACC": "acc_tcga",
    "MESO": "meso_tcga",
    "UVM": "uvm_tcga",
    "TGCT": "tgct_tcga",
    "KICH": "kich_tcga",
    "KIRP": "kirp_tcga",
    "DLBC": "dlbc_tcga",
    "LAML": "laml_tcga",
    "LGG": "lgg_tcga",
}

# Canonical immune cell marker genes from the TIMER paper
IMMUNE_MARKERS: Dict[str, str] = {
    "B_cell": "CD19",
    "CD4_T_cell": "CD4",
    "CD8_T_cell": "CD8A",
    "Neutrophil": "FCGR3B",
    "Macrophage": "CD68",
    "Dendritic_cell": "ITGAX",
}


[docs] @register_tool("TIMERTool") class TIMERTool(BaseTool): """ Replicates TIMER2.0 tumor immune estimation using cBioPortal TCGA data. Since TIMER3.0 no longer has a public REST API, this tool queries cBioPortal (www.cbioportal.org) for TCGA expression and survival data and computes equivalent statistics locally. """
[docs] def __init__(self, tool_config: Dict[str, Any]): super().__init__(tool_config) self.parameter = tool_config.get("parameter", {}) self.required = self.parameter.get("required", [])
[docs] def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]: operation = arguments.get("operation") if not operation: return {"status": "error", "error": "Missing required parameter: operation"} operation_handlers = { "immune_estimation": self._immune_estimation, "gene_correlation": self._gene_correlation, "survival_association": self._survival_association, } handler = operation_handlers.get(operation) if not handler: return { "status": "error", "error": f"Unknown operation: {operation}", "available_operations": list(operation_handlers.keys()), } try: return handler(arguments) except requests.exceptions.Timeout: return {"status": "error", "error": "cBioPortal API request timed out"} except requests.exceptions.ConnectionError: return {"status": "error", "error": "Failed to connect to cBioPortal API"} except Exception as e: return {"status": "error", "error": f"Operation failed: {str(e)}"}
# ── helpers ──────────────────────────────────────────────────────────────
[docs] def _resolve_study(self, cancer: str) -> Tuple[Optional[str], Optional[str]]: study_id = CANCER_STUDY_MAP.get(cancer.upper()) if not study_id: # Try lower-case fallback: e.g., "BRCA" → "brca_tcga" study_id = f"{cancer.lower()}_tcga" return study_id, None
[docs] def _get_mrna_profile(self, study_id: str) -> Optional[str]: for attempt in range(3): try: r = requests.get( f"{CBIOPORTAL_BASE}/studies/{study_id}/molecular-profiles", params={"projection": "SUMMARY"}, timeout=20, ) if r.status_code == 200: break except requests.exceptions.RequestException: pass if attempt < 2: time.sleep(2**attempt) else: return None if r.status_code != 200: return None preferred_suffixes = [ "_rna_seq_v2_mrna", "_rna_seq_mrna", "_mrna", ] profiles = r.json() for suffix in preferred_suffixes: for p in profiles: pid = p.get("molecularProfileId", "") if ( p.get("molecularAlterationType") == "MRNA_EXPRESSION" and pid.endswith(suffix) and not pid.endswith("_Zscores") ): return pid return None
[docs] def _get_samples(self, study_id: str, n: int = 200) -> List[str]: for attempt in range(3): try: r = requests.get( f"{CBIOPORTAL_BASE}/studies/{study_id}/samples", params={"projection": "ID", "pageSize": n}, timeout=20, ) if r.status_code == 200: return [s["sampleId"] for s in r.json()] except requests.exceptions.RequestException: pass if attempt < 2: time.sleep(2**attempt) return []
[docs] def _get_gene_id(self, symbol: str) -> Optional[int]: r = requests.get( f"{CBIOPORTAL_BASE}/genes/{symbol.upper()}", params={"projection": "SUMMARY"}, timeout=10, ) if r.status_code == 200: return r.json().get("entrezGeneId") return None
[docs] def _get_expression( self, profile_id: str, entrez_ids: List[int], sample_ids: List[str], ) -> List[Dict]: r = requests.post( f"{CBIOPORTAL_BASE}/molecular-data/fetch", params={"projection": "SUMMARY"}, json={ "entrezGeneIds": entrez_ids, "sampleMolecularIdentifiers": [ {"molecularProfileId": profile_id, "sampleId": sid} for sid in sample_ids ], }, timeout=60, ) return r.json() if r.status_code == 200 else []
[docs] def _get_os_data(self, study_id: str) -> Tuple[Dict[str, float], Dict[str, str]]: r = requests.get( f"{CBIOPORTAL_BASE}/studies/{study_id}/clinical-data", params={ "clinicalDataType": "PATIENT", "projection": "SUMMARY", "pageSize": 100000, }, timeout=60, ) if r.status_code != 200: return {}, {} records = r.json() os_months: Dict[str, float] = {} os_status: Dict[str, str] = {} for rec in records: attr = rec.get("clinicalAttributeId") pid = rec.get("patientId", "") val = rec.get("value", "") if attr == "OS_MONTHS": try: os_months[pid] = float(val) except ValueError: pass elif attr == "OS_STATUS": os_status[pid] = val return os_months, os_status
# ── operations ───────────────────────────────────────────────────────────
[docs] def _immune_estimation(self, arguments: Dict[str, Any]) -> Dict[str, Any]: cancer = arguments.get("cancer") correlate_gene = arguments.get("gene") if not cancer: return { "status": "error", "error": "cancer is required (e.g., 'BRCA', 'LUAD')", } study_id, err = self._resolve_study(cancer) if err: return {"status": "error", "error": err} profile_id = self._get_mrna_profile(study_id) if not profile_id: return { "status": "error", "error": f"No mRNA expression profile found for {cancer} ({study_id})", } # Collect genes: immune markers + optional correlate gene marker_symbols = list(IMMUNE_MARKERS.values()) all_symbols = marker_symbols + ([correlate_gene] if correlate_gene else []) # Resolve Entrez IDs entrez_ids = [] symbol_to_entrez: Dict[str, int] = {} for sym in all_symbols: eid = self._get_gene_id(sym) if eid: entrez_ids.append(eid) symbol_to_entrez[sym] = eid sample_ids = self._get_samples(study_id, n=100) if not sample_ids: return {"status": "error", "error": f"No samples found for {cancer}"} records = self._get_expression(profile_id, entrez_ids, sample_ids) if not records: return {"status": "error", "error": "Expression data not available"} # Group by gene expr_by_entrez: Dict[int, List[float]] = {} for rec in records: eid = rec.get("entrezGeneId") val = rec.get("value") if eid and val is not None: expr_by_entrez.setdefault(eid, []).append(float(val)) # Build immune infiltration summary immune_scores: Dict[str, Dict] = {} for cell_type, sym in IMMUNE_MARKERS.items(): eid = symbol_to_entrez.get(sym) vals = expr_by_entrez.get(eid, []) if eid else [] if vals: mean_val = sum(vals) / len(vals) sorted_vals = sorted(vals) n = len(sorted_vals) median_val = ( sorted_vals[n // 2] if n % 2 else (sorted_vals[n // 2 - 1] + sorted_vals[n // 2]) / 2 ) immune_scores[cell_type] = { "marker_gene": sym, "mean_expression": round(mean_val, 4), "median_expression": round(median_val, 4), "n_samples": n, } if correlate_gene: eid = symbol_to_entrez.get(correlate_gene) target_vals = expr_by_entrez.get(eid, []) if eid else [] if target_vals: for cell_type, sym in IMMUNE_MARKERS.items(): m_eid = symbol_to_entrez.get(sym) marker_vals = expr_by_entrez.get(m_eid, []) if m_eid else [] if marker_vals and target_vals: n = min(len(marker_vals), len(target_vals)) try: from scipy.stats import spearmanr corr, pval = spearmanr(marker_vals[:n], target_vals[:n]) immune_scores[cell_type]["correlation_with_gene"] = { "gene": correlate_gene, "spearman_r": round(float(corr), 4), "p_value": round(float(pval), 6), } except ImportError: pass return { "status": "success", "data": { "cancer": cancer.upper(), "study_id": study_id, "profile_id": profile_id, "n_samples": len(sample_ids), "immune_infiltration": immune_scores, "method": "Marker gene expression (CD19, CD4, CD8A, FCGR3B, CD68, ITGAX) via cBioPortal TCGA data", "note": "TIMER2.0/3.0 API is unavailable; using cBioPortal TCGA expression as proxy", }, }
[docs] def _gene_correlation(self, arguments: Dict[str, Any]) -> Dict[str, Any]: cancer = arguments.get("cancer") gene1 = arguments.get("gene1") gene2 = arguments.get("gene2") if not cancer: return {"status": "error", "error": "cancer is required (e.g., 'BRCA')"} if not gene1 or not gene2: return {"status": "error", "error": "Both gene1 and gene2 are required"} study_id, err = self._resolve_study(cancer) if err: return {"status": "error", "error": err} profile_id = self._get_mrna_profile(study_id) if not profile_id: return {"status": "error", "error": f"No mRNA profile found for {cancer}"} eid1 = self._get_gene_id(gene1) eid2 = self._get_gene_id(gene2) if not eid1: return { "status": "error", "error": f"Gene '{gene1}' not found in cBioPortal", } if not eid2: return { "status": "error", "error": f"Gene '{gene2}' not found in cBioPortal", } sample_ids = self._get_samples(study_id, n=200) if not sample_ids: return {"status": "error", "error": f"No samples for {cancer}"} records = self._get_expression(profile_id, [eid1, eid2], sample_ids) if not records: return {"status": "error", "error": "Expression data unavailable"} # Build per-sample value maps vals1: Dict[str, float] = {} vals2: Dict[str, float] = {} for rec in records: sid = rec.get("sampleId") eid = rec.get("entrezGeneId") val = rec.get("value") if sid and val is not None: if eid == eid1: vals1[sid] = float(val) elif eid == eid2: vals2[sid] = float(val) # Paired samples only common = sorted(set(vals1) & set(vals2)) if len(common) < 10: return { "status": "error", "error": f"Insufficient paired samples ({len(common)}) for correlation", } x = [vals1[s] for s in common] y = [vals2[s] for s in common] try: from scipy.stats import spearmanr corr, pval = spearmanr(x, y) except ImportError: # Manual rank correlation fallback def _rank(arr): s = sorted(range(len(arr)), key=lambda i: arr[i]) ranks = [0.0] * len(arr) for rank, idx in enumerate(s): ranks[idx] = float(rank + 1) return ranks rx, ry = _rank(x), _rank(y) n = len(rx) mx = sum(rx) / n my = sum(ry) / n num = sum((rx[i] - mx) * (ry[i] - my) for i in range(n)) den = ( sum((rx[i] - mx) ** 2 for i in range(n)) * sum((ry[i] - my) ** 2 for i in range(n)) ) ** 0.5 corr = num / den if den else 0.0 pval = None return { "status": "success", "data": { "cancer": cancer.upper(), "gene1": gene1, "gene2": gene2, "n_samples": len(common), "spearman_r": round(float(corr), 4), "p_value": round(float(pval), 6) if pval is not None else None, "profile_id": profile_id, "note": "TIMER2.0/3.0 API unavailable; correlation computed from cBioPortal TCGA expression", }, }
[docs] def _survival_association(self, arguments: Dict[str, Any]) -> Dict[str, Any]: cancer = arguments.get("cancer") gene = arguments.get("gene") if not cancer: return {"status": "error", "error": "cancer is required (e.g., 'BRCA')"} if not gene: return {"status": "error", "error": "gene is required (e.g., 'CD8A')"} study_id, err = self._resolve_study(cancer) if err: return {"status": "error", "error": err} profile_id = self._get_mrna_profile(study_id) if not profile_id: return {"status": "error", "error": f"No mRNA profile found for {cancer}"} eid = self._get_gene_id(gene) if not eid: return {"status": "error", "error": f"Gene '{gene}' not found"} # Get survival data os_months, os_status = self._get_os_data(study_id) if not os_months: return {"status": "error", "error": f"No survival data for {cancer}"} # Get expression for all available samples sample_ids = self._get_samples(study_id, n=1000) expr_records = self._get_expression(profile_id, [eid], sample_ids) if not expr_records: return { "status": "error", "error": f"Expression data unavailable for {gene}", } # Map sampleId → (patientId, expression) — sampleId has -01 suffix for primary tumor sample_to_expr: Dict[str, float] = { rec["sampleId"]: float(rec["value"]) for rec in expr_records if rec.get("value") is not None } # Map patientId from sampleId (TCGA-XX-XXXX-01 → TCGA-XX-XXXX) patient_expr: Dict[str, float] = {} for sid, val in sample_to_expr.items(): pid = "-".join(sid.split("-")[:3]) # TCGA-XX-XXXX patient_expr[pid] = val # Intersect with patients having OS data common_pids = sorted(set(patient_expr) & set(os_months) & set(os_status)) if len(common_pids) < 20: return { "status": "error", "error": f"Insufficient patients with both expression and survival data ({len(common_pids)})", } # Median split expr_vals = [patient_expr[p] for p in common_pids] median_expr = sorted(expr_vals)[len(expr_vals) // 2] high_group = [p for p in common_pids if patient_expr[p] >= median_expr] low_group = [p for p in common_pids if patient_expr[p] < median_expr] def _parse_event(status_str: str) -> int: """1=event (deceased), 0=censored (living).""" s = str(status_str).upper() if "DECEASED" in s or s == "1" or s.startswith("1:"): return 1 return 0 high_t = [os_months[p] for p in high_group] high_e = [_parse_event(os_status[p]) for p in high_group] low_t = [os_months[p] for p in low_group] low_e = [_parse_event(os_status[p]) for p in low_group] # Log-rank test try: from scipy.stats import logrank, CensoredData x = CensoredData( uncensored=[t for t, e in zip(high_t, high_e) if e == 1], right=[t for t, e in zip(high_t, high_e) if e == 0], ) y = CensoredData( uncensored=[t for t, e in zip(low_t, low_e) if e == 1], right=[t for t, e in zip(low_t, low_e) if e == 0], ) result = logrank(x, y) logrank_stat = round(float(result.statistic), 4) logrank_pval = round(float(result.pvalue), 6) except Exception: logrank_stat = None logrank_pval = None def _median_survival(times, events): """Simple KM median survival.""" paired = sorted(zip(times, events)) at_risk = len(paired) surv = 1.0 for t, e in paired: if e: surv *= (at_risk - 1) / at_risk at_risk -= 1 if surv <= 0.5: return t return None return { "status": "success", "data": { "cancer": cancer.upper(), "gene": gene, "n_patients": len(common_pids), "median_expression_cutoff": round(float(median_expr), 4), "high_expression_group": { "n": len(high_group), "n_events": sum(high_e), "median_survival_months": _median_survival(high_t, high_e), }, "low_expression_group": { "n": len(low_group), "n_events": sum(low_e), "median_survival_months": _median_survival(low_t, low_e), }, "log_rank_statistic": logrank_stat, "log_rank_p_value": logrank_pval, "profile_id": profile_id, "note": "TIMER2.0/3.0 API unavailable; survival computed from cBioPortal TCGA data", }, }