tooluniverse.ncbi_variation_tool 源代码

"""
NCBI Variation Services API tool for ToolUniverse.

Provides SPDI/HGVS variant notation conversion, variant normalization,
and dbSNP rsID lookup via the NCBI Variation Services API.

API: https://api.ncbi.nlm.nih.gov/variation/v0/
No authentication required.
"""

import requests
from typing import Dict, Any
from .base_tool import BaseTool
from .tool_registry import register_tool

NCBI_VAR_BASE = "https://api.ncbi.nlm.nih.gov/variation/v0"


[文档] @register_tool("NCBIVariationTool") class NCBIVariationTool(BaseTool): """ Tool for SPDI/HGVS variant notation conversion and normalization using the NCBI Variation Services API. Supports: spdi_to_hgvs, hgvs_to_spdi, spdi_equivalents, spdi_canonical, rsid_lookup. No authentication required. """
[文档] def __init__(self, tool_config: Dict[str, Any]): super().__init__(tool_config) self.timeout = tool_config.get("timeout", 30) self.endpoint_type = tool_config.get("fields", {}).get( "endpoint_type", "spdi_to_hgvs" )
[文档] def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Execute the NCBI Variation Services API call.""" try: return self._query(arguments) except requests.exceptions.Timeout: return { "status": "error", "error": f"NCBI Variation API timed out after {self.timeout}s", } except requests.exceptions.ConnectionError: return { "status": "error", "error": "Failed to connect to NCBI Variation API.", } except Exception as e: return { "status": "error", "error": f"Error querying NCBI Variation API: {str(e)}", }
[文档] def _query(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Route to the appropriate endpoint.""" dispatch = { "spdi_to_hgvs": self._spdi_to_hgvs, "hgvs_to_spdi": self._hgvs_to_spdi, "spdi_equivalents": self._spdi_equivalents, "spdi_canonical": self._spdi_canonical, "rsid_lookup": self._rsid_lookup, } handler = dispatch.get(self.endpoint_type) if not handler: return { "status": "error", "error": f"Unknown endpoint type: {self.endpoint_type}", } return handler(arguments)
[文档] def _spdi_to_hgvs(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Convert SPDI notation to HGVS.""" spdi = arguments.get("spdi", "") if not spdi: return {"status": "error", "error": "spdi parameter is required"} url = f"{NCBI_VAR_BASE}/spdi/{spdi}/hgvs" resp = requests.get(url, timeout=self.timeout) if resp.status_code != 200: return { "status": "error", "error": f"API returned {resp.status_code}: {resp.text[:200]}", } data = resp.json() return { "status": "success", "data": data.get("data", data), }
[文档] def _hgvs_to_spdi(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Convert HGVS notation to SPDI.""" hgvs = arguments.get("hgvs", "") if not hgvs: return {"status": "error", "error": "hgvs parameter is required"} url = f"{NCBI_VAR_BASE}/hgvs/{hgvs}/contextuals" resp = requests.get(url, timeout=self.timeout) if resp.status_code != 200: return { "status": "error", "error": f"API returned {resp.status_code}: {resp.text[:200]}", } data = resp.json() result = data.get("data", data) return { "status": "success", "data": result, }
[文档] def _spdi_equivalents(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Get all equivalent SPDI representations across assemblies.""" spdi = arguments.get("spdi", "") if not spdi: return {"status": "error", "error": "spdi parameter is required"} url = f"{NCBI_VAR_BASE}/spdi/{spdi}/all_equivalent_contextual" resp = requests.get(url, timeout=self.timeout) if resp.status_code != 200: return { "status": "error", "error": f"API returned {resp.status_code}: {resp.text[:200]}", } data = resp.json() spdis = data.get("data", {}).get("spdis", []) return { "status": "success", "data": { "equivalents": spdis, "count": len(spdis), }, }
[文档] def _spdi_canonical(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Get canonical representative SPDI for a variant.""" spdi = arguments.get("spdi", "") if not spdi: return {"status": "error", "error": "spdi parameter is required"} url = f"{NCBI_VAR_BASE}/spdi/{spdi}/canonical_representative" resp = requests.get(url, timeout=self.timeout) if resp.status_code != 200: return { "status": "error", "error": f"API returned {resp.status_code}: {resp.text[:200]}", } data = resp.json() return { "status": "success", "data": data.get("data", data), }
[文档] def _rsid_lookup(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Look up a dbSNP rsID and return variant details.""" rsid = arguments.get("rsid", "") if not rsid: return {"status": "error", "error": "rsid parameter is required"} # Strip 'rs' prefix if present rsid_num = rsid.lstrip("rs") if not rsid_num.isdigit(): return { "status": "error", "error": f"Invalid rsID: '{rsid}'. Must be numeric or start with 'rs'.", } url = f"{NCBI_VAR_BASE}/refsnp/{rsid_num}" resp = requests.get(url, timeout=self.timeout) if resp.status_code != 200: return { "status": "error", "error": f"API returned {resp.status_code}: {resp.text[:200]}", } data = resp.json() # Extract key information from the large response result = { "refsnp_id": data.get("refsnp_id"), "create_date": data.get("create_date"), "last_update_date": data.get("last_update_date"), "citations": data.get("citations", []), "mane_select_ids": data.get("mane_select_ids", []), } # Extract primary snapshot data snapshot = data.get("primary_snapshot_data", {}) if snapshot: result["organism"] = snapshot.get("organism") result["variant_type"] = snapshot.get("variant_type") # Extract placements for GRCh38 placements = snapshot.get("placements_with_allele", []) grch38_placements = [] for p in placements: assembly = p.get("placement_annot", {}).get( "seq_id_traits_by_assembly", [] ) for a in assembly: if "GRCh38" in a.get("assembly_name", ""): alleles = p.get("alleles", []) for allele in alleles: spdi = allele.get("allele", {}).get("spdi", {}) if spdi: grch38_placements.append( { "seq_id": spdi.get("seq_id"), "position": spdi.get("position"), "deleted_sequence": spdi.get( "deleted_sequence" ), "inserted_sequence": spdi.get( "inserted_sequence" ), } ) break if grch38_placements: result["grch38_placements"] = grch38_placements # Extract allele annotations (clinical significance, frequency) allele_annots = snapshot.get("allele_annotations", []) if allele_annots: clinical = [] for annot in allele_annots: for assembly_annot in annot.get("assembly_annotation", []): for gene in assembly_annot.get("genes", []): clinical.append( { "gene": gene.get("locus"), "name": gene.get("name"), "gene_id": gene.get("id"), } ) if clinical: result["genes"] = clinical # Extract clinical significance for annot in allele_annots: clin = annot.get("clinical", []) if clin: result["clinical_significance"] = [ { "accession": c.get("accession_version"), "review_status": c.get("review_status"), "disease_names": c.get("disease_names", []), "significance": c.get("clinical_significances", []), } for c in clin[:5] # Limit to first 5 ] return { "status": "success", "data": result, }