Source code for tooluniverse.clinvar_tool

"""
ClinVar REST API Tool

This tool provides access to the ClinVar database for clinical variant information,
disease associations, and clinical significance data.
"""

import requests
import time
from typing import Dict, Any, Optional
from .base_tool import BaseTool
from .tool_registry import register_tool


[docs] class ClinVarRESTTool(BaseTool): """Base class for ClinVar REST API tools."""
[docs] def __init__(self, tool_config): super().__init__(tool_config) self.base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils" self.session = requests.Session() self.session.headers.update( {"Accept": "application/json", "User-Agent": "ToolUniverse/1.0"} ) self.timeout = 30
def _make_request( self, endpoint: str, params: Optional[Dict] = None, max_retries: int = 3 ) -> Dict[str, Any]: """Make a request to the ClinVar API with automatic retry for rate limiting.""" url = f"{self.base_url}{endpoint}" for attempt in range(max_retries + 1): try: response = self.session.get(url, params=params, timeout=self.timeout) # Handle rate limiting (429 error) if response.status_code == 429: retry_after = response.headers.get("Retry-After") if retry_after: wait_time = int(retry_after) else: # Default exponential backoff: 1, 2, 4 seconds wait_time = 2**attempt if attempt < max_retries: print( f"Rate limited (429). Waiting {wait_time} seconds before retry {attempt + 1}/{max_retries}..." ) time.sleep(wait_time) continue else: return { "status": "error", "error": f"Rate limited after {max_retries} retries. Please wait before making more requests.", "url": url, "retry_after": retry_after, } response.raise_for_status() # ClinVar API returns XML by default, but we can request JSON if params and params.get("retmode") == "json": data = response.json() else: # Parse XML response data = response.text return { "status": "success", "data": data, "url": url, "content_type": response.headers.get( "content-type", "application/xml" ), "rate_limit_info": { "limit": response.headers.get("X-RateLimit-Limit"), "remaining": response.headers.get("X-RateLimit-Remaining"), }, } except requests.exceptions.RequestException as e: if attempt < max_retries: wait_time = 2**attempt print( f"Request failed: {str(e)}. Retrying in {wait_time} seconds..." ) time.sleep(wait_time) continue else: return { "status": "error", "error": f"ClinVar API request failed after {max_retries} retries: {str(e)}", "url": url, } return {"status": "error", "error": "Maximum retries exceeded", "url": url}
[docs] def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Execute the tool with given arguments.""" return self._make_request(self.endpoint, arguments)
[docs] @register_tool("ClinVarSearchVariants") class ClinVarSearchVariants(ClinVarRESTTool): """Search for variants in ClinVar by gene or condition."""
[docs] def __init__(self, tool_config): super().__init__(tool_config) self.endpoint = "/esearch.fcgi"
[docs] def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Search variants by gene or condition.""" params = { "db": "clinvar", "retmode": "json", "retmax": arguments.get("max_results", 20), } # Build search query query_parts = [] if "gene" in arguments: query_parts.append(f"{arguments['gene']}[gene]") if "condition" in arguments: query_parts.append(f"{arguments['condition']}[condition]") if "variant_id" in arguments: query_parts.append(f"{arguments['variant_id']}[variant_id]") if not query_parts: return { "status": "error", "error": "At least one search parameter is required", } params["term"] = " AND ".join(query_parts) result = self._make_request(self.endpoint, params) # Add search parameters to result and format data if result.get("status") == "success": result["search_params"] = { "gene": arguments.get("gene"), "condition": arguments.get("condition"), "variant_id": arguments.get("variant_id"), } # Format search results for better usability data = result.get("data", {}) if "esearchresult" in data: esearch = data["esearchresult"] formatted_results = { "total_count": int(esearch.get("count", 0)), "variant_ids": esearch.get("idlist", []), "query_translation": esearch.get("querytranslation", ""), "search_params": result["search_params"], "summary": f"Found {esearch.get('count', 0)} variants matching the search criteria", } result["formatted_results"] = formatted_results return result
[docs] @register_tool("ClinVarGetVariantDetails") class ClinVarGetVariantDetails(ClinVarRESTTool): """Get detailed variant information by ClinVar ID."""
[docs] def __init__(self, tool_config): super().__init__(tool_config) self.endpoint = "/esummary.fcgi"
[docs] def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Get variant details by ClinVar ID.""" variant_id = arguments.get("variant_id", "") if not variant_id: return {"status": "error", "error": "variant_id is required"} params = {"db": "clinvar", "id": variant_id, "retmode": "json"} result = self._make_request(self.endpoint, params) # Add variant_id to result and format data if result.get("status") == "success": result["variant_id"] = variant_id # Format the data for better usability data = result.get("data", {}) if "result" in data and variant_id in data["result"]: variant_data = data["result"][variant_id] # Extract key information formatted_data = { "variant_id": variant_id, "accession": variant_data.get("accession", ""), "title": variant_data.get("title", ""), "obj_type": variant_data.get("obj_type", ""), "genes": [ gene.get("symbol", "") for gene in variant_data.get("genes", []) ], "clinical_significance": variant_data.get( "germline_classification", {} ).get("description", ""), "review_status": variant_data.get( "germline_classification", {} ).get("review_status", ""), "chromosome": variant_data.get("chr_sort", ""), "location": variant_data.get("variation_set", [{}])[0] .get("variation_loc", [{}])[0] .get("band", ""), "variation_name": variant_data.get("variation_set", [{}])[0].get( "variation_name", "" ), "raw_data": variant_data, # Keep original data for advanced users } result["formatted_data"] = formatted_data return result
[docs] @register_tool("ClinVarGetClinicalSignificance") class ClinVarGetClinicalSignificance(ClinVarRESTTool): """Get clinical significance information for variants."""
[docs] def __init__(self, tool_config): super().__init__(tool_config) self.endpoint = "/esummary.fcgi"
[docs] def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Get clinical significance by variant ID.""" variant_id = arguments.get("variant_id", "") if not variant_id: return {"status": "error", "error": "variant_id is required"} params = {"db": "clinvar", "id": variant_id, "retmode": "json"} result = self._make_request(self.endpoint, params) # Add variant_id to result and format clinical significance data if result.get("status") == "success": result["variant_id"] = variant_id # Format the clinical significance data data = result.get("data", {}) if "result" in data and variant_id in data["result"]: variant_data = data["result"][variant_id] # Extract clinical significance information germline_class = variant_data.get("germline_classification", {}) clinical_impact = variant_data.get("clinical_impact_classification", {}) oncogenicity = variant_data.get("oncogenicity_classification", {}) formatted_data = { "variant_id": variant_id, "germline_classification": { "description": germline_class.get("description", ""), "review_status": germline_class.get("review_status", ""), "last_evaluated": germline_class.get("last_evaluated", ""), "fda_recognized": germline_class.get( "fda_recognized_database", "" ), "traits": [ trait.get("trait_name", "") for trait in germline_class.get("trait_set", []) ], }, "clinical_impact": { "description": clinical_impact.get("description", ""), "review_status": clinical_impact.get("review_status", ""), "last_evaluated": clinical_impact.get("last_evaluated", ""), }, "oncogenicity": { "description": oncogenicity.get("description", ""), "review_status": oncogenicity.get("review_status", ""), "last_evaluated": oncogenicity.get("last_evaluated", ""), }, "raw_data": variant_data, # Keep original data for advanced users } result["formatted_data"] = formatted_data return result