Source code for tooluniverse.compound_variant_tool

"""Compound tool: annotate a variant from multiple sources in one call.

Queries ClinVar, gnomAD, CIViC, and UniProt for a given variant,
cross-references results, and returns a unified annotation with
pathogenicity classification, population frequencies, and clinical evidence.
"""

from typing import Any, Dict, List

from .base_tool import BaseTool
from .tool_registry import register_tool


[docs] @register_tool("CompoundVariantAnnotationTool") class CompoundVariantAnnotationTool(BaseTool): """Annotate a variant from ClinVar, gnomAD, CIViC, and UniProt in one call."""
[docs] def __init__(self, tool_config: Dict[str, Any], **kwargs): super().__init__(tool_config)
[docs] def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]: variant = arguments.get("variant") gene = arguments.get("gene") or arguments.get("gene_symbol") rsid = arguments.get("rsid") if not variant and not gene and not rsid: return { "status": "error", "error": "At least one of 'variant', 'gene', or 'rsid' is required.", } from .execute_function import ToolUniverse tu = ToolUniverse() tu.load_tools() annotations: Dict[str, Any] = {} sources_failed: List[str] = [] query_term = rsid or variant or gene # 1. ClinVar try: r = tu.run_one_function( { "name": "ClinVar_search_variants", "arguments": {"query": query_term, "limit": 10}, } ) annotations["clinvar"] = self._parse_clinvar(r) except Exception as e: sources_failed.append(f"ClinVar: {str(e)[:100]}") # 2. gnomAD gene_for_gnomad = gene if not gene_for_gnomad and variant: parts = variant.split() if parts: gene_for_gnomad = parts[0] if gene_for_gnomad: try: r = tu.run_one_function( { "name": "gnomad_get_gene", "arguments": {"gene_symbol": gene_for_gnomad}, } ) annotations["gnomad"] = self._parse_gnomad(r) except Exception as e: sources_failed.append(f"gnomAD: {str(e)[:100]}") # 3. CIViC if gene_for_gnomad: try: r = tu.run_one_function( { "name": "civic_get_variants_by_gene", "arguments": {"gene_symbol": gene_for_gnomad}, } ) annotations["civic"] = self._parse_civic(r, variant) except Exception as e: sources_failed.append(f"CIViC: {str(e)[:100]}") # 4. UniProt if gene_for_gnomad: try: r = tu.run_one_function( { "name": "UniProt_search", "arguments": { "query": gene_for_gnomad, "organism": "human", "limit": 3, }, } ) annotations["uniprot"] = self._parse_uniprot(r) except Exception as e: sources_failed.append(f"UniProt: {str(e)[:100]}") # Build summary summary = self._build_summary(annotations, variant, gene_for_gnomad, rsid) return { "status": "success", "data": { "query": {"variant": variant, "gene": gene, "rsid": rsid}, "sources_queried": list(annotations.keys()), "sources_failed": sources_failed, "summary": summary, "annotations": annotations, }, }
[docs] def _parse_clinvar(self, result: Any) -> Dict[str, Any]: if not isinstance(result, dict): return {"raw": str(result)[:200]} data = result.get("data", {}) variants = data.get("variants", []) if isinstance(data, dict) else [] parsed = [] for v in variants[:10]: if isinstance(v, dict): parsed.append( { "name": v.get("title", v.get("name", "")), "classification": v.get( "clinical_significance", v.get("classification", ""), ), "condition": v.get("condition", ""), "review_status": v.get("review_status", ""), } ) return {"total": data.get("total_count", len(parsed)), "variants": parsed}
[docs] def _parse_gnomad(self, result: Any) -> Dict[str, Any]: if not isinstance(result, dict): return {"raw": str(result)[:200]} data = result.get("data", {}) if isinstance(data, dict): return { "gene": data.get("gene_id", ""), "constraints": { k: data.get(k) for k in ["pLI", "loeuf", "mis_z"] if data.get(k) is not None }, "variants_summary": data.get("variants", {}) if isinstance(data.get("variants"), dict) else {}, } return {"raw": str(data)[:200]}
[docs] def _parse_civic(self, result: Any, variant: str = None) -> Dict[str, Any]: if not isinstance(result, dict): return {"raw": str(result)[:200]} data = result.get("data", {}) variants = [] if isinstance(data, list): variants = data elif isinstance(data, dict): variants = data.get("variants", data.get("data", [])) parsed = [] for v in variants[:20]: if isinstance(v, dict): name = v.get("name", v.get("variant_name", "")) parsed.append( { "name": name, "evidence_count": v.get("evidence_count", 0), "therapies": v.get("therapies", []), } ) return {"total_variants": len(parsed), "variants": parsed}
[docs] def _parse_uniprot(self, result: Any) -> Dict[str, Any]: if not isinstance(result, dict): return {"raw": str(result)[:200]} data = result.get("data", {}) if isinstance(data, list) and data: entry = data[0] return { "accession": entry.get("accession", ""), "protein_name": entry.get("protein_name", ""), "gene_name": entry.get("gene_name", ""), "function": str(entry.get("function", ""))[:300], } return {"raw": str(data)[:200]}
[docs] def _build_summary( self, annotations: Dict[str, Any], variant: str = None, gene: str = None, rsid: str = None, ) -> Dict[str, Any]: summary = {"query": variant or gene or rsid, "sources_with_data": []} for source, data in annotations.items(): if data and not data.get("raw"): summary["sources_with_data"].append(source) clinvar = annotations.get("clinvar", {}) if clinvar.get("variants"): classifications = [ v["classification"] for v in clinvar["variants"] if v.get("classification") ] if classifications: summary["clinvar_classification"] = classifications[0] gnomad = annotations.get("gnomad", {}) if gnomad.get("constraints"): summary["gnomad_constraints"] = gnomad["constraints"] civic = annotations.get("civic", {}) if civic.get("total_variants"): summary["civic_variants_found"] = civic["total_variants"] return summary