Source code for tooluniverse.compound_variant_tool
"""Compound tool: annotate a variant from multiple sources in one call.
Queries ClinVar, gnomAD, CIViC, and UniProt for a given variant,
cross-references results, and returns a unified annotation with
pathogenicity classification, population frequencies, and clinical evidence.
"""
from typing import Any, Dict, List
from .base_tool import BaseTool
from .tool_registry import register_tool
[docs]
@register_tool("CompoundVariantAnnotationTool")
class CompoundVariantAnnotationTool(BaseTool):
"""Annotate a variant from ClinVar, gnomAD, CIViC, and UniProt in one call."""
[docs]
def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
variant = arguments.get("variant")
gene = arguments.get("gene") or arguments.get("gene_symbol")
rsid = arguments.get("rsid")
if not variant and not gene and not rsid:
return {
"status": "error",
"error": "At least one of 'variant', 'gene', or 'rsid' is required.",
}
from .execute_function import ToolUniverse
tu = ToolUniverse()
tu.load_tools()
annotations: Dict[str, Any] = {}
sources_failed: List[str] = []
query_term = rsid or variant or gene
# 1. ClinVar
try:
r = tu.run_one_function(
{
"name": "ClinVar_search_variants",
"arguments": {"query": query_term, "limit": 10},
}
)
annotations["clinvar"] = self._parse_clinvar(r)
except Exception as e:
sources_failed.append(f"ClinVar: {str(e)[:100]}")
# 2. gnomAD
gene_for_gnomad = gene
if not gene_for_gnomad and variant:
parts = variant.split()
if parts:
gene_for_gnomad = parts[0]
if gene_for_gnomad:
try:
r = tu.run_one_function(
{
"name": "gnomad_get_gene",
"arguments": {"gene_symbol": gene_for_gnomad},
}
)
annotations["gnomad"] = self._parse_gnomad(r)
except Exception as e:
sources_failed.append(f"gnomAD: {str(e)[:100]}")
# 3. CIViC
if gene_for_gnomad:
try:
r = tu.run_one_function(
{
"name": "civic_get_variants_by_gene",
"arguments": {"gene_symbol": gene_for_gnomad},
}
)
annotations["civic"] = self._parse_civic(r, variant)
except Exception as e:
sources_failed.append(f"CIViC: {str(e)[:100]}")
# 4. UniProt
if gene_for_gnomad:
try:
r = tu.run_one_function(
{
"name": "UniProt_search",
"arguments": {
"query": gene_for_gnomad,
"organism": "human",
"limit": 3,
},
}
)
annotations["uniprot"] = self._parse_uniprot(r)
except Exception as e:
sources_failed.append(f"UniProt: {str(e)[:100]}")
# Build summary
summary = self._build_summary(annotations, variant, gene_for_gnomad, rsid)
return {
"status": "success",
"data": {
"query": {"variant": variant, "gene": gene, "rsid": rsid},
"sources_queried": list(annotations.keys()),
"sources_failed": sources_failed,
"summary": summary,
"annotations": annotations,
},
}
[docs]
def _parse_clinvar(self, result: Any) -> Dict[str, Any]:
if not isinstance(result, dict):
return {"raw": str(result)[:200]}
data = result.get("data", {})
variants = data.get("variants", []) if isinstance(data, dict) else []
parsed = []
for v in variants[:10]:
if isinstance(v, dict):
parsed.append(
{
"name": v.get("title", v.get("name", "")),
"classification": v.get(
"clinical_significance",
v.get("classification", ""),
),
"condition": v.get("condition", ""),
"review_status": v.get("review_status", ""),
}
)
return {"total": data.get("total_count", len(parsed)), "variants": parsed}
[docs]
def _parse_gnomad(self, result: Any) -> Dict[str, Any]:
if not isinstance(result, dict):
return {"raw": str(result)[:200]}
data = result.get("data", {})
if isinstance(data, dict):
return {
"gene": data.get("gene_id", ""),
"constraints": {
k: data.get(k)
for k in ["pLI", "loeuf", "mis_z"]
if data.get(k) is not None
},
"variants_summary": data.get("variants", {})
if isinstance(data.get("variants"), dict)
else {},
}
return {"raw": str(data)[:200]}
[docs]
def _parse_civic(self, result: Any, variant: str = None) -> Dict[str, Any]:
if not isinstance(result, dict):
return {"raw": str(result)[:200]}
data = result.get("data", {})
variants = []
if isinstance(data, list):
variants = data
elif isinstance(data, dict):
variants = data.get("variants", data.get("data", []))
parsed = []
for v in variants[:20]:
if isinstance(v, dict):
name = v.get("name", v.get("variant_name", ""))
parsed.append(
{
"name": name,
"evidence_count": v.get("evidence_count", 0),
"therapies": v.get("therapies", []),
}
)
return {"total_variants": len(parsed), "variants": parsed}
[docs]
def _parse_uniprot(self, result: Any) -> Dict[str, Any]:
if not isinstance(result, dict):
return {"raw": str(result)[:200]}
data = result.get("data", {})
if isinstance(data, list) and data:
entry = data[0]
return {
"accession": entry.get("accession", ""),
"protein_name": entry.get("protein_name", ""),
"gene_name": entry.get("gene_name", ""),
"function": str(entry.get("function", ""))[:300],
}
return {"raw": str(data)[:200]}
[docs]
def _build_summary(
self,
annotations: Dict[str, Any],
variant: str = None,
gene: str = None,
rsid: str = None,
) -> Dict[str, Any]:
summary = {"query": variant or gene or rsid, "sources_with_data": []}
for source, data in annotations.items():
if data and not data.get("raw"):
summary["sources_with_data"].append(source)
clinvar = annotations.get("clinvar", {})
if clinvar.get("variants"):
classifications = [
v["classification"]
for v in clinvar["variants"]
if v.get("classification")
]
if classifications:
summary["clinvar_classification"] = classifications[0]
gnomad = annotations.get("gnomad", {})
if gnomad.get("constraints"):
summary["gnomad_constraints"] = gnomad["constraints"]
civic = annotations.get("civic", {})
if civic.get("total_variants"):
summary["civic_variants_found"] = civic["total_variants"]
return summary