Source code for tooluniverse.uniprot_tool

import requests
from typing import Any, Dict
from .base_tool import BaseTool
from .tool_registry import register_tool


[docs] @register_tool("UniProtRESTTool") class UniProtRESTTool(BaseTool):
[docs] def __init__(self, tool_config: Dict): super().__init__(tool_config) self.endpoint = tool_config["fields"]["endpoint"] self.extract_path = tool_config["fields"].get("extract_path") self.timeout = 15 # Increase timeout for large entries
[docs] def _build_url(self, args: Dict[str, Any]) -> str: url = self.endpoint for k, v in args.items(): url = url.replace(f"{{{k}}}", str(v)) return url
[docs] def _extract_data(self, data: Dict, extract_path: str) -> Any: """Custom data extraction with support for filtering""" # Handle specific UniProt extraction patterns if extract_path == "comments[?(@.commentType=='FUNCTION')].texts[*].value": # Extract function comments result = [] for comment in data.get("comments", []): if comment.get("commentType") == "FUNCTION": for text in comment.get("texts", []): if "value" in text: result.append(text["value"]) return result elif ( extract_path == "comments[?(@.commentType=='SUBCELLULAR LOCATION')].subcellularLocations[*].location.value" ): # Extract subcellular locations result = [] for comment in data.get("comments", []): if comment.get("commentType") == "SUBCELLULAR LOCATION": for location in comment.get("subcellularLocations", []): if "location" in location and "value" in location["location"]: result.append(location["location"]["value"]) return result elif extract_path == "features[?(@.type=='VARIANT')]": # Extract variant features (correct type is "Natural variant") result = [] for feature in data.get("features", []): if feature.get("type") == "Natural variant": result.append(feature) return result elif ( extract_path == "features[?(@.type=='MODIFIED RESIDUE' || @.type=='SIGNAL')]" ): # Extract PTM and signal features (correct types are "Modified residue" and "Signal") result = [] for feature in data.get("features", []): if feature.get("type") in ["Modified residue", "Signal"]: result.append(feature) return result elif ( extract_path == "comments[?(@.commentType=='ALTERNATIVE PRODUCTS')].isoforms[*].isoformIds[*]" ): # Extract isoform IDs result = [] for comment in data.get("comments", []): if comment.get("commentType") == "ALTERNATIVE PRODUCTS": for isoform in comment.get("isoforms", []): for isoform_id in isoform.get("isoformIds", []): result.append(isoform_id) return result # For simple paths, use jsonpath_ng try: from jsonpath_ng import parse expr = parse(extract_path) matches = expr.find(data) extracted_data = [m.value for m in matches] # Return single item if only one match, otherwise return list if len(extracted_data) == 0: return {"error": f"No data found for JSONPath: {extract_path}"} elif len(extracted_data) == 1: return extracted_data[0] else: return extracted_data except ImportError: return {"error": "jsonpath_ng library is required for data extraction"} except Exception as e: return { "error": f"Failed to extract UniProt fields using JSONPath '{extract_path}': {e}" }
[docs] def run(self, arguments: Dict[str, Any]) -> Any: # Build URL url = self._build_url(arguments) try: resp = requests.get(url, timeout=self.timeout) if resp.status_code != 200: return { "error": f"UniProt API returned status code: {resp.status_code}", "detail": resp.text, } data = resp.json() except requests.exceptions.Timeout: return {"error": "Request to UniProt API timed out"} except requests.exceptions.RequestException as e: return {"error": f"Request to UniProt API failed: {e}"} except ValueError as e: return {"error": f"Failed to parse JSON response: {e}"} # If extract_path is configured, extract the corresponding subset if self.extract_path: result = self._extract_data(data, self.extract_path) # Handle empty results if isinstance(result, list) and len(result) == 0: return {"error": f"No data found for path: {self.extract_path}"} elif isinstance(result, dict) and "error" in result: return result return result return data
# Method bindings for backward compatibility
[docs] def get_entry_by_accession(self, accession: str) -> Any: return self.run({"accession": accession})
[docs] def get_function_by_accession(self, accession: str) -> Any: return self.run({"accession": accession})
[docs] def get_names_taxonomy_by_accession(self, accession: str) -> Any: return self.run({"accession": accession})
[docs] def get_subcellular_location_by_accession(self, accession: str) -> Any: return self.run({"accession": accession})
[docs] def get_disease_variants_by_accession(self, accession: str) -> Any: return self.run({"accession": accession})
[docs] def get_ptm_processing_by_accession(self, accession: str) -> Any: return self.run({"accession": accession})
[docs] def get_sequence_isoforms_by_accession(self, accession: str) -> Any: return self.run({"accession": accession})