Source code for tooluniverse.pheweb_phewas_tool
"""
PheWAS (Phenome-Wide Association Study) tools for ToolUniverse.
Cross-biobank PheWAS coverage. Given a single variant, these tools return every
phenotype associated with it in a population biobank — the inverse of a GWAS
(which fixes a phenotype and scans variants). This is the standard workflow for
cross-ancestry replication and pleiotropy assessment.
Biobanks covered (all PheWeb-based, public, no authentication):
- BioBank Japan (BBJ) https://pheweb.jp (Japanese, GRCh37)
- UKB-TOPMed https://pheweb.org/UKB-TOPMed (European/UKB, GRCh38)
- TPMI https://pheweb.ibms.sinica.edu.tw (Taiwanese, GRCh38)
Plus Genebass for exome-wide gene-level burden PheWAS (GRCh38):
- Genebass https://main.genebass.org (UK Biobank 394K exomes)
Together with the existing FinnGen tools (Finnish), these enable replication of a
genetic association across European, Japanese, Taiwanese, and UK ancestries.
"""
import re
import ssl
import requests
from typing import Dict, Any, List, Optional, Tuple
from .base_tool import BaseTool
from .tool_registry import register_tool
class _RelaxedStrictAdapter(requests.adapters.HTTPAdapter):
"""HTTPS adapter that keeps full CA-chain + hostname verification but clears
OpenSSL 3.x's VERIFY_X509_STRICT flag.
Some public data servers (e.g. TPMI) ship certificates that omit the Subject
Key Identifier extension, which OpenSSL 3.x rejects in strict mode even though
the certificate chains to a trusted root. This relaxes only that pedantic
check; untrusted, expired, self-signed, or wrong-host certificates are still
rejected, so it is NOT equivalent to verify=False.
"""
def init_poolmanager(self, *args, **kwargs):
import certifi
from urllib3.util.ssl_ import create_urllib3_context
ctx = create_urllib3_context()
ctx.load_verify_locations(certifi.where())
ctx.verify_flags &= ~ssl.VERIFY_X509_STRICT
kwargs["ssl_context"] = ctx
return super().init_poolmanager(*args, **kwargs)
# Per-biobank configuration: base URL, genome build, and the Ensembl REST server
# used to resolve rsIDs to coordinates in that biobank's build.
_PHEWEB_BIOBANKS = {
"bbj": {
"label": "BioBank Japan",
"base_url": "https://pheweb.jp",
"build": "GRCh37",
"ensembl_server": "https://grch37.rest.ensembl.org",
"ancestry": "Japanese",
},
"ukb_topmed": {
"label": "UKB-TOPMed",
"base_url": "https://pheweb.org/UKB-TOPMed",
"build": "GRCh38",
"ensembl_server": "https://rest.ensembl.org",
"ancestry": "European (UK Biobank)",
},
"tpmi": {
"label": "TPMI (Taiwan Precision Medicine Initiative)",
"base_url": "https://pheweb.ibms.sinica.edu.tw",
"build": "GRCh38",
"ensembl_server": "https://rest.ensembl.org",
"ancestry": "Taiwanese (Han Chinese)",
# The TPMI host serves a certificate that fails OpenSSL 3.x *strict*
# verification ("Missing Subject Key Identifier"). It still chains to a
# trusted CA, so for this host only we relax the strict-X509 flag while
# keeping full CA-chain + hostname verification (untrusted certs are
# still rejected).
"relax_tls_strictness": True,
},
}
# Some hosts (e.g. TPMI behind openresty) reject non-browser user agents.
_BROWSER_UA = "Mozilla/5.0 (compatible; ToolUniverse/1.0; +https://aiscientist.tools)"
_JSON_HEADERS = {"Accept": "application/json", "User-Agent": _BROWSER_UA}
_RSID_RE = re.compile(r"^rs\d+$", re.IGNORECASE)
_VARIANT_RE = re.compile(
r"^(?:chr)?(\w+)[:\-](\d+)[:\-]([ACGTN]+)[:\-]([ACGTN]+)$", re.IGNORECASE
)
def _resolve_rsid(
rsid: str, server: str, build: str, timeout: int
) -> Tuple[List[str], Optional[str]]:
"""Resolve an rsID to candidate chr:pos-ref-alt variants in the requested build.
Returns (candidates, error_message). Multi-allelic SNPs (e.g. allele_string
"C/G/T") yield one candidate per alternate allele, since the caller cannot
know which alt the biobank recorded; the caller tries each in turn.
"""
url = f"{server}/variation/human/{rsid}"
resp = requests.get(
url, headers={"Content-Type": "application/json"}, timeout=timeout
)
if resp.status_code == 404:
return [], f"rsID '{rsid}' not found in Ensembl ({build})"
resp.raise_for_status()
candidates: List[str] = []
for m in resp.json().get("mappings", []):
if m.get("assembly_name") != build:
continue
alleles = (m.get("allele_string") or "").split("/")
chrom = m.get("seq_region_name")
pos = m.get("start")
if len(alleles) < 2 or not chrom or not pos:
continue
ref = alleles[0]
for alt in alleles[1:]:
cand = f"{chrom}:{pos}-{ref}-{alt}"
if cand not in candidates:
candidates.append(cand)
if not candidates:
return [], f"rsID '{rsid}' has no {build} mapping with a clear ref/alt allele"
return candidates, None
def _filter_and_sort_by_pval(
associations: List[Dict[str, Any]], max_pval: Optional[float]
) -> List[Dict[str, Any]]:
"""Drop associations above max_pval (if given) and sort ascending by p-value.
Non-numeric p-values are treated as 1.0 so they sort last and are excluded by
any max_pval filter.
"""
if max_pval is not None:
associations = [
a
for a in associations
if isinstance(a["pval"], (int, float)) and a["pval"] <= max_pval
]
associations.sort(
key=lambda a: a["pval"] if isinstance(a["pval"], (int, float)) else 1.0
)
return associations
def _canonicalize_variant(variant: str) -> Optional[str]:
"""Normalize a chr:pos:ref:alt / chr-pos-ref-alt string to PheWeb's chr:pos-ref-alt."""
m = _VARIANT_RE.match(variant.strip())
if not m:
return None
chrom, pos, ref, alt = m.groups()
return f"{chrom}:{pos}-{ref.upper()}-{alt.upper()}"
[docs]
@register_tool("PheWebPheWASTool")
class PheWebPheWASTool(BaseTool):
"""Variant-level PheWAS lookup across PheWeb-based population biobanks.
The target biobank is selected by ``fields.biobank`` (bbj | ukb_topmed | tpmi).
Accepts either an rsID (resolved to the biobank's build via Ensembl) or an
explicit ``chr:pos:ref:alt`` variant already in that build.
"""
[docs]
def __init__(self, tool_config: Dict[str, Any]):
super().__init__(tool_config)
self.timeout = tool_config.get("timeout", 30)
biobank_key = tool_config.get("fields", {}).get("biobank", "ukb_topmed")
self.biobank_key = biobank_key
self.cfg = _PHEWEB_BIOBANKS.get(biobank_key)
self._http: Optional[requests.Session] = None
[docs]
def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
if self.cfg is None:
return {
"status": "error",
"error": f"Unknown biobank '{self.biobank_key}'. Expected one of: {', '.join(_PHEWEB_BIOBANKS)}",
}
try:
return self._run(arguments)
except requests.exceptions.Timeout:
return {
"status": "error",
"error": f"{self.cfg['label']} PheWAS request timed out after {self.timeout}s",
}
except requests.exceptions.ConnectionError:
return {
"status": "error",
"error": f"Failed to connect to {self.cfg['label']} PheWAS API",
}
except Exception as e: # noqa: BLE001 - tools must never raise
return {
"status": "error",
"error": f"{self.cfg['label']} PheWAS error: {str(e)}",
}
[docs]
def _run(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
warnings: List[str] = []
candidates, err = self._resolve_input(arguments, warnings)
if err:
return {"status": "error", "error": err}
limit = arguments.get("limit", 20)
max_pval = arguments.get("max_pval")
# Try each candidate variant (multi-allelic rsIDs yield several) and use
# the one actually present in the biobank. PheWeb returns HTTP 200 with a
# null/empty body for absent variants, so select by presence of phenos.
canonical = candidates[0]
data = None
candidate_errors: List[str] = []
for cand in candidates:
try:
resp = self._get(f"{self.cfg['base_url']}/api/variant/{cand}")
if resp.status_code == 404:
continue
resp.raise_for_status()
payload = resp.json()
except requests.exceptions.RequestException as e:
# A transient failure (e.g. 500/502) on one candidate must not
# abort the lookup; a later candidate may still resolve.
candidate_errors.append(f"{cand}: {str(e)}")
continue
if isinstance(payload, dict) and payload.get("phenos"):
canonical = cand
data = payload
break
# Only surface an error if every candidate failed to fetch (none reached
# a usable 200/404 response).
if (
data is None
and candidate_errors
and len(candidate_errors) == len(candidates)
):
return {
"status": "error",
"error": (
f"{self.cfg['label']} PheWAS lookup failed for all candidates: "
+ "; ".join(candidate_errors)
),
}
if data is None:
tried = ", ".join(candidates)
return {
"status": "success",
"data": {"variant": candidates[0], "associations": []},
"metadata": {
"source": self.cfg["label"],
"build": self.cfg["build"],
"total_associations": 0,
"note": f"Variant not present in {self.cfg['label']} ({self.cfg['build']}); tried {tried}",
"warnings": warnings,
},
}
# data is guaranteed to be a dict with non-empty "phenos" here (selected above).
phenos = data.get("phenos", [])
associations = [
{
"phenocode": p.get("phenocode"),
"phenostring": p.get("phenostring"),
"category": p.get("category"),
"pval": p.get("pval"),
"beta": p.get("beta"),
"sebeta": p.get("sebeta"),
"af": p.get("af"),
"num_cases": p.get("num_cases"),
"num_controls": p.get("num_controls"),
}
for p in phenos
]
associations = _filter_and_sort_by_pval(associations, max_pval)
total = len(associations)
truncated = total > limit
return {
"status": "success",
"data": {
"variant": canonical,
"rsids": data.get("rsids"),
"nearest_genes": data.get("nearest_genes"),
"associations": associations[:limit],
},
"metadata": {
"source": self.cfg["label"],
"ancestry": self.cfg["ancestry"],
"build": self.cfg["build"],
"total_associations": total,
"returned": min(limit, total),
"truncated": truncated,
"variant_url": f"{self.cfg['base_url']}/variant/{canonical}",
"warnings": warnings,
},
}
[docs]
def _session(self) -> requests.Session:
"""A requests Session, using the relaxed-strict TLS adapter for hosts whose
certificate omits the SKI extension (verification stays on; see adapter)."""
if self._http is None:
self._http = requests.Session()
if self.cfg.get("relax_tls_strictness"):
self._http.mount(self.cfg["base_url"], _RelaxedStrictAdapter())
return self._http
[docs]
def _get(self, url: str) -> requests.Response:
return self._session().get(url, headers=_JSON_HEADERS, timeout=self.timeout)
[docs]
def _resolve_input(
self, arguments: Dict[str, Any], warnings: List[str]
) -> Tuple[List[str], Optional[str]]:
"""Return (candidate_variants, error). Handles rsID and explicit variant input."""
rsid = (arguments.get("rsid") or "").strip()
variant = (arguments.get("variant") or "").strip()
# A bare rsID passed in the variant slot is still an rsID.
if not rsid and _RSID_RE.match(variant):
rsid, variant = variant, ""
if rsid:
if not _RSID_RE.match(rsid):
return [], f"Invalid rsID '{rsid}' (expected e.g. rs7903146)"
candidates, err = _resolve_rsid(
rsid, self.cfg["ensembl_server"], self.cfg["build"], self.timeout
)
if err:
return [], err
warnings.append(
f"Resolved {rsid} to {len(candidates)} candidate variant(s) "
f"on {self.cfg['build']} via Ensembl: {', '.join(candidates)}"
)
return candidates, None
if variant:
canonical = _canonicalize_variant(variant)
if not canonical:
return [], (
f"Invalid variant '{variant}'. Expected chr:pos:ref:alt "
f"(e.g. 10:112998590:C:T) in {self.cfg['build']}, or an rsID."
)
return [canonical], None
return (
[],
"Provide either 'rsid' (e.g. rs7903146) or 'variant' (chr:pos:ref:alt)",
)
# Genebass canonical burden sets and their accepted aliases.
_GENEBASS_BURDEN = {
"plof": "pLoF",
"lof": "pLoF",
"missense": "missense|LC",
"missenselc": "missense|LC",
"synonymous": "synonymous",
}
_ENSG_RE = re.compile(r"^ENSG\d+", re.IGNORECASE)
# The fields that jointly identify a Genebass phenotype. The same composite key
# is built from the /phenotypes catalogue and from each /phewas row so the two
# can be joined for human-readable descriptions; they MUST stay in sync.
_GENEBASS_PHENO_KEY_FIELDS = (
"trait_type",
"phenocode",
"pheno_sex",
"coding",
"modifier",
)
def _genebass_pheno_key(record: Dict[str, Any]) -> str:
return "-".join(str(record.get(k, "")) for k in _GENEBASS_PHENO_KEY_FIELDS)
[docs]
@register_tool("GenebassTool")
class GenebassTool(BaseTool):
"""Exome-wide gene-level burden PheWAS from Genebass (UK Biobank 394K exomes).
Given one gene, returns its rare-variant burden association across ~4,500
phenotypes (GRCh38). Complements variant-level PheWAS by aggregating rare
coding variants into a single gene-level test (SAIGE-GENE+).
"""
BASE_URL = "https://main.genebass.org/api"
[docs]
def __init__(self, tool_config: Dict[str, Any]):
super().__init__(tool_config)
self.timeout = tool_config.get("timeout", 30)
self._desc_cache: Optional[Dict[str, str]] = None
[docs]
def _phenotype_descriptions(self) -> Dict[str, str]:
"""Map the composite phenotype key to its human-readable description (cached).
Keyed by _genebass_pheno_key (trait_type-phenocode-pheno_sex-coding-modifier),
matching the key built from each /phewas row so descriptions can be joined.
"""
if self._desc_cache is not None:
return self._desc_cache
cache: Dict[str, str] = {}
try:
resp = requests.get(
f"{self.BASE_URL}/phenotypes",
headers=_JSON_HEADERS,
timeout=self.timeout,
)
resp.raise_for_status()
for p in resp.json():
if p.get("description"):
cache[_genebass_pheno_key(p)] = p["description"]
except (requests.RequestException, ValueError):
# Descriptions are a nicety; absence must not fail the main lookup.
pass
self._desc_cache = cache
return cache
[docs]
def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
try:
return self._run(arguments)
except requests.exceptions.Timeout:
return {
"status": "error",
"error": f"Genebass request timed out after {self.timeout}s",
}
except requests.exceptions.ConnectionError:
return {"status": "error", "error": "Failed to connect to Genebass API"}
except Exception as e: # noqa: BLE001 - tools must never raise
return {"status": "error", "error": f"Genebass error: {str(e)}"}
[docs]
def _run(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
warnings: List[str] = []
gene_input = (arguments.get("gene") or "").strip()
if not gene_input:
return {
"status": "error",
"error": "gene is required (Ensembl gene ID or symbol)",
}
burden_raw = (arguments.get("burden_set") or "pLoF").strip()
burden_key = re.sub(r"[^a-z0-9]", "", burden_raw.lower())
burden = _GENEBASS_BURDEN.get(burden_key)
if burden is None:
return {
"status": "error",
"error": (
f"Invalid burden_set '{burden_raw}'. Allowed: pLoF, missense|LC, "
"synonymous (aliases: lof/plof -> pLoF, missense -> missense|LC)"
),
}
gene_id, err = self._resolve_gene(gene_input, warnings)
if err:
return {"status": "error", "error": err}
limit = arguments.get("limit", 25)
max_pval = arguments.get("max_pval")
url = (
f"{self.BASE_URL}/phewas/{gene_id}?burdenSet={requests.utils.quote(burden)}"
)
resp = requests.get(url, headers=_JSON_HEADERS, timeout=self.timeout)
if resp.status_code == 404:
return {
"status": "error",
"error": f"Gene '{gene_id}' not found in Genebass",
}
resp.raise_for_status()
data = resp.json()
gene_meta = data.get("gene", {}) if isinstance(data, dict) else {}
rows = data.get("phewas", []) if isinstance(data, dict) else []
associations = [
{
"phenocode": r.get("phenocode"),
"description": None,
"trait_type": r.get("trait_type"),
"pval": r.get("Pvalue_Burden"),
"pval_skat": r.get("Pvalue_SKAT"),
"beta": r.get("BETA_Burden"),
"total_variants": r.get("total_variants"),
"_key": _genebass_pheno_key(r),
}
for r in rows
]
associations = _filter_and_sort_by_pval(associations, max_pval)
total = len(associations)
# Phewas rows carry no human-readable phenotype name; join the top hits
# against the phenotype catalogue (one cached lookup) for readability.
top = associations[:limit]
desc_map = self._phenotype_descriptions()
for a in top:
a["description"] = desc_map.get(a.pop("_key"))
for a in associations[limit:]:
a.pop("_key", None)
return {
"status": "success",
"data": {
"gene_id": gene_meta.get("gene_id", gene_id),
"gene_symbol": gene_meta.get("symbol"),
"burden_set": burden,
"associations": top,
},
"metadata": {
"source": "Genebass (UK Biobank 394K exomes, SAIGE-GENE+)",
"build": "GRCh38",
"total_associations": total,
"returned": min(limit, total),
"truncated": total > limit,
"gene_url": f"https://genebass.org/gene/{gene_id}",
"warnings": warnings,
},
}
[docs]
def _resolve_gene(
self, gene_input: str, warnings: List[str]
) -> Tuple[Optional[str], Optional[str]]:
"""Resolve a gene symbol to an Ensembl gene ID; pass through ENSG IDs."""
if _ENSG_RE.match(gene_input):
return gene_input.split(".", 1)[0].upper(), None
url = f"https://rest.ensembl.org/xrefs/symbol/homo_sapiens/{gene_input}"
resp = requests.get(
url, headers={"Content-Type": "application/json"}, timeout=self.timeout
)
if resp.status_code == 404:
return None, f"Gene symbol '{gene_input}' not found in Ensembl"
resp.raise_for_status()
for xref in resp.json():
if str(xref.get("id", "")).upper().startswith("ENSG"):
ensg = xref["id"].split(".", 1)[0].upper()
warnings.append(f"Resolved symbol '{gene_input}' to {ensg} via Ensembl")
return ensg, None
return None, f"No Ensembl gene ID found for symbol '{gene_input}'"