Source code for tooluniverse.opengwas_tool

"""
OpenGWAS (IEU) tool for ToolUniverse.

Fetches the genetic instruments needed for a custom two-sample Mendelian
randomization analysis from the IEU OpenGWAS database: the genome-wide
significant, LD-clumped SNPs for an exposure GWAS (``/tophits``) and those
same SNPs' effects in an outcome GWAS (``/associations``), harmonized onto a
common effect allele so they are ready to feed into an MR estimator.

This complements EpiGraphDB's pre-computed MR-EvE results (which only cover
curated trait pairs) by letting callers assemble instruments for arbitrary
exposure/outcome GWAS pairs.

API: https://api.opengwas.io/api  (POST /tophits, POST /associations)
Auth: free JWT token in the OPENGWAS_JWT environment variable
      (register at https://api.opengwas.io).
"""

import os
import requests

from .base_tool import BaseTool
from .tool_registry import register_tool

OPENGWAS_BASE = "https://api.opengwas.io/api"


[docs] @register_tool("OpenGWASTool") class OpenGWASTool(BaseTool): """Assemble harmonized two-sample MR instruments from IEU OpenGWAS."""
[docs] def __init__(self, tool_config): super().__init__(tool_config) # Clumping at the API can be slow, so allow a generous default. self.timeout = tool_config.get("fields", {}).get("timeout", 60)
[docs] def _headers(self): """Bearer headers, or None when no token is configured.""" token = os.environ.get("OPENGWAS_JWT", "").strip() if not token: return None return { "Authorization": f"Bearer {token}", "Content-Type": "application/json", "X-Api-Source": "tooluniverse", }
[docs] def run(self, arguments): try: return self._fetch_mr_instruments(arguments) except requests.exceptions.Timeout: return { "status": "error", "error": ( f"OpenGWAS request timed out after {self.timeout}s. LD " "clumping is expensive; retry, or set clump=0 / a preclumped " "exposure." ), } except requests.exceptions.HTTPError as e: code = e.response.status_code if e.response is not None else "?" detail = e.response.text[:200] if e.response is not None else "" hint = ( " The OPENGWAS_JWT token is missing/expired/invalid — get a free" " one at https://api.opengwas.io." if code in (401, 403) else "" ) return { "status": "error", "error": f"OpenGWAS HTTP {code}: {detail}{hint}", } except requests.exceptions.RequestException as e: return {"status": "error", "error": f"OpenGWAS request failed: {e}"} except Exception as e: # never raise out of run() return {"status": "error", "error": f"Unexpected OpenGWAS error: {e}"}
[docs] def _post(self, path, payload): resp = requests.post( f"{OPENGWAS_BASE}/{path}", json=payload, headers=self._headers(), timeout=self.timeout, ) resp.raise_for_status() data = resp.json() # The API returns a JSON array; some deployments wrap it in {"results": [...]}. if isinstance(data, dict): return data.get("results", []) return data if isinstance(data, list) else []
[docs] @staticmethod def _norm(record): """Pick the standard OpenGWAS association fields off one record.""" return { "rsid": record.get("rsid"), "chr": record.get("chr"), "position": record.get("position"), "ea": (record.get("ea") or "").upper() or None, "nea": (record.get("nea") or "").upper() or None, "eaf": record.get("eaf"), "beta": record.get("beta"), "se": record.get("se"), "p": record.get("p"), "n": record.get("n"), }
[docs] @staticmethod def _harmonize(exposure, outcome): """Align an outcome record to the exposure's effect allele. Returns (outcome_beta, outcome_eaf, status) where status is one of 'aligned' (alleles match), 'flipped' (strand/allele swap, beta negated), or 'incompatible' (alleles don't correspond — drop before MR). """ ea, nea = exposure["ea"], exposure["nea"] o_ea, o_nea = outcome["ea"], outcome["nea"] beta, eaf = outcome["beta"], outcome["eaf"] if o_ea == ea and o_nea == nea: return beta, eaf, "aligned" if o_ea == nea and o_nea == ea: flipped_beta = -beta if isinstance(beta, (int, float)) else beta flipped_eaf = 1 - eaf if isinstance(eaf, (int, float)) else eaf return flipped_beta, flipped_eaf, "flipped" return beta, eaf, "incompatible"
[docs] def _fetch_mr_instruments(self, arguments): exposure_id = (arguments.get("exposure_id") or "").strip() outcome_id = (arguments.get("outcome_id") or "").strip() if not exposure_id: return { "status": "error", "error": ( "exposure_id is required (an IEU OpenGWAS study ID, e.g. " "'ieu-a-2' for BMI). Use EpiGraphDB_search_opengwas to find IDs." ), } if self._headers() is None: return { "status": "error", "error": ( "OpenGWAS requires a free JWT token. Register at " "https://api.opengwas.io and set the OPENGWAS_JWT " "environment variable." ), } pval = float(arguments.get("pval", 5e-8)) clump = int(arguments.get("clump", 1)) # Step 1: exposure instruments (genome-wide significant, LD-clumped). raw_instruments = self._post( "tophits", { "id": [exposure_id], "pval": pval, "clump": clump, "r2": float(arguments.get("r2", 0.001)), "kb": int(arguments.get("kb", 10000)), "pop": arguments.get("pop", "EUR"), }, ) instruments = [self._norm(r) for r in raw_instruments if r.get("rsid")] # Shared metadata base for every success envelope below. base_metadata = { "exposure_id": exposure_id, "pval": pval, "source": "IEU OpenGWAS", } if not instruments: return { "status": "success", "data": {"instruments": [], "mr_input": [], "n_instruments": 0}, "metadata": { **base_metadata, "note": ( f"No genome-wide instruments for exposure '{exposure_id}' " f"at p < {pval}. Verify the ID via EpiGraphDB_search_opengwas, " "or relax pval — a weak-instrument MR is unreliable." ), }, } # Without an outcome, return the exposure instruments alone. if not outcome_id: return { "status": "success", "data": { "instruments": instruments, "mr_input": [], "n_instruments": len(instruments), }, "metadata": { **base_metadata, "note": ( "Exposure instruments only — pass outcome_id to get " "harmonized two-sample MR input." ), }, } # Step 2: those SNPs' effects in the outcome GWAS, then harmonize. rsids = [i["rsid"] for i in instruments] raw_outcome = self._post( "associations", { "variant": rsids, "id": [outcome_id], "proxies": int(arguments.get("proxies", 0)), }, ) outcome_by_rsid = { o["rsid"]: o for o in (self._norm(r) for r in raw_outcome) if o["rsid"] } mr_input, missing, incompatible = [], 0, 0 for inst in instruments: out = outcome_by_rsid.get(inst["rsid"]) if out is None: missing += 1 continue out_beta, out_eaf, harmon = self._harmonize(inst, out) if harmon == "incompatible": incompatible += 1 continue mr_input.append( { "rsid": inst["rsid"], "ea": inst["ea"], "nea": inst["nea"], "eaf": inst["eaf"], "exposure_beta": inst["beta"], "exposure_se": inst["se"], "exposure_p": inst["p"], "outcome_beta": out_beta, "outcome_se": out["se"], "outcome_p": out["p"], "outcome_eaf": out_eaf, "harmonization": harmon, } ) return { "status": "success", "data": { "instruments": instruments, "mr_input": mr_input, "n_instruments": len(instruments), "n_mr_input": len(mr_input), }, "metadata": { **base_metadata, "outcome_id": outcome_id, "n_missing_in_outcome": missing, "n_incompatible_alleles": incompatible, "description": ( "Harmonized two-sample MR input: each row's exposure_beta and " "outcome_beta are aligned to the same effect allele (ea). Feed " "into an IVW/MR-Egger estimator. Palindromic SNPs are not " "strand-resolved here — review before use." ), }, }