Source code for tooluniverse.uniprot_tool

import time
import requests
from typing import Any, Dict, Optional
from .base_tool import BaseTool, ToolError
from .tool_registry import register_tool


[docs] @register_tool("UniProtRESTTool") class UniProtRESTTool(BaseTool):
[docs] def __init__(self, tool_config: Dict): super().__init__(tool_config) self.endpoint = tool_config["fields"]["endpoint"] self.extract_path = tool_config["fields"].get("extract_path") self.timeout = 15 # Increase timeout for large entries
[docs] def validate_parameters(self, arguments: Dict[str, Any]) -> Optional[ToolError]: """ Validate parameters with automatic type coercion for limit. """ # Coerce limit to integer if passed as string if "limit" in arguments and isinstance(arguments["limit"], str): try: arguments["limit"] = int(arguments["limit"]) except (ValueError, TypeError): # Let schema validation handle the error pass # Call parent validation return super().validate_parameters(arguments)
[docs] def _build_url(self, args: Dict[str, Any]) -> str: url = self.endpoint for k, v in args.items(): url = url.replace(f"{{{k}}}", str(v)) return url
[docs] def _extract_data(self, data: Dict, extract_path: str) -> Any: """Custom data extraction with support for filtering""" # Handle specific UniProt extraction patterns if extract_path == ("comments[?(@.commentType=='FUNCTION')].texts[*].value"): # Extract function comments result = [] for comment in data.get("comments", []): if comment.get("commentType") == "FUNCTION": for text in comment.get("texts", []): if "value" in text: result.append(text["value"]) return result elif extract_path == ( "comments[?(@.commentType==" "'SUBCELLULAR LOCATION')].subcellularLocations[*].location.value" ): # Extract subcellular locations result = [] for comment in data.get("comments", []): if comment.get("commentType") == "SUBCELLULAR LOCATION": for location in comment.get("subcellularLocations", []): if "location" in location and ("value" in location["location"]): result.append(location["location"]["value"]) return result elif extract_path == "features[?(@.type=='VARIANT')]": # Extract variant features result = [] for feature in data.get("features", []): if feature.get("type") == "Natural variant": result.append(feature) return result elif extract_path == ( "features[?(@.type=='MODIFIED RESIDUE' || @.type=='SIGNAL')]" ): # Extract PTM and signal features result = [] for feature in data.get("features", []): if feature.get("type") in ["Modified residue", "Signal"]: result.append(feature) return result elif extract_path == ( "comments[?(@.commentType==" "'ALTERNATIVE PRODUCTS')].isoforms[*].isoformIds[*]" ): # Extract isoform IDs result = [] for comment in data.get("comments", []): if comment.get("commentType") == "ALTERNATIVE PRODUCTS": for isoform in comment.get("isoforms", []): for isoform_id in isoform.get("isoformIds", []): result.append(isoform_id) return result # For simple paths, use jsonpath_ng try: from jsonpath_ng import parse expr = parse(extract_path) matches = expr.find(data) extracted_data = [m.value for m in matches] # Return single item if only one match, otherwise return list if len(extracted_data) == 0: return {"error": f"No data found for JSONPath: {extract_path}"} elif len(extracted_data) == 1: return extracted_data[0] else: return extracted_data except ImportError: return {"error": "jsonpath_ng library is required for data extraction"} except Exception as e: return { "error": ( f"Failed to extract UniProt fields using " f"JSONPath '{extract_path}': {e}" ) }
[docs] def _handle_id_mapping(self, arguments: Dict[str, Any]) -> Any: """Handle ID mapping requests""" ids = arguments.get("ids", []) from_db = arguments.get("from_db", "") to_db = arguments.get("to_db", "UniProtKB") max_wait_time = arguments.get("max_wait_time", 30) # Normalize IDs to list if isinstance(ids, str): ids = [ids] # Normalize database names db_mapping = { "Ensembl": "Ensembl", "Gene_Name": "Gene_Name", "RefSeq_Protein": "RefSeq_Protein_ID", "PDB": "PDB_ID", "EMBL": "EMBL_ID", "UniProtKB": "UniProtKB_AC-ID", } from_db_normalized = db_mapping.get(from_db, from_db) to_db_normalized = db_mapping.get(to_db, to_db) # Step 1: Submit mapping job submit_url = "https://rest.uniprot.org/idmapping/run" payload = {"ids": ids, "from": from_db_normalized, "to": to_db_normalized} try: resp = requests.post(submit_url, json=payload, timeout=self.timeout) resp.raise_for_status() job_data = resp.json() job_id = job_data.get("jobId") if not job_id: return {"error": "Failed to get job ID from UniProt ID mapping"} # Step 2: Poll for job completion status_url = f"https://rest.uniprot.org/idmapping/status/{job_id}" results_url = f"https://rest.uniprot.org/idmapping/results/{job_id}" start_time = time.time() while time.time() - start_time < max_wait_time: status_resp = requests.get(status_url, timeout=self.timeout) status_data = status_resp.json() if status_data.get("status") == "FINISHED": # Step 3: Retrieve results results_resp = requests.get(results_url, timeout=self.timeout) results_data = results_resp.json() # Format results formatted_results = [] failed = [] # Extract mappings results = results_data.get("results", []) for result in results: from_value = result.get("from", "") to_values = result.get("to", {}).get("results", []) if to_values: for to_item in to_values: to_info = to_item.get("to", {}) gene_names = to_info.get("geneNames", []) gene_name = "" if gene_names: gene_name = gene_names[0].get("value", "") formatted_results.append( { "from": from_value, "to": { "accession": to_info.get( "primaryAccession", "" ), "id": to_info.get("uniProtkbId", ""), "gene_name": gene_name, }, } ) else: failed.append(from_value) return { "mapped_count": len(formatted_results), "results": formatted_results, "failed": list(set(failed)) if failed else [], } elif status_data.get("status") == "FAILED": return {"error": "ID mapping job failed"} time.sleep(1) # Wait 1 second before next poll return {"error": (f"ID mapping timed out after {max_wait_time} seconds")} except requests.exceptions.Timeout: return {"error": "Request to UniProt API timed out"} except requests.exceptions.RequestException as e: return {"error": f"Request to UniProt API failed: {e}"} except ValueError as e: return {"error": f"Failed to parse JSON response: {e}"}
[docs] def run(self, arguments: Dict[str, Any]) -> Any: # Check if this is a search request search_type = self.tool_config.get("fields", {}).get("search_type") mapping_type = self.tool_config.get("fields", {}).get("mapping_type") if search_type == "search": return self._handle_search(arguments) elif mapping_type == "async": return self._handle_id_mapping(arguments) # Build URL for standard accession-based queries url = self._build_url(arguments) try: resp = requests.get(url, timeout=self.timeout) if resp.status_code != 200: return { "error": (f"UniProt API returned status code: {resp.status_code}"), "detail": resp.text, } data = resp.json() except requests.exceptions.Timeout: return {"error": "Request to UniProt API timed out"} except requests.exceptions.RequestException as e: return {"error": f"Request to UniProt API failed: {e}"} except ValueError as e: return {"error": f"Failed to parse JSON response: {e}"} # If extract_path is configured, extract the corresponding subset if self.extract_path: result = self._extract_data(data, self.extract_path) # Handle empty results if isinstance(result, list) and len(result) == 0: return {"error": f"No data found for path: {self.extract_path}"} elif isinstance(result, dict) and "error" in result: return result return result return data
# Method bindings for backward compatibility
[docs] def get_entry_by_accession(self, accession: str) -> Any: return self.run({"accession": accession})
[docs] def get_function_by_accession(self, accession: str) -> Any: return self.run({"accession": accession})
[docs] def get_names_taxonomy_by_accession(self, accession: str) -> Any: return self.run({"accession": accession})
[docs] def get_subcellular_location_by_accession(self, accession: str) -> Any: return self.run({"accession": accession})
[docs] def get_disease_variants_by_accession(self, accession: str) -> Any: return self.run({"accession": accession})
[docs] def get_ptm_processing_by_accession(self, accession: str) -> Any: return self.run({"accession": accession})
[docs] def get_sequence_isoforms_by_accession(self, accession: str) -> Any: return self.run({"accession": accession})