Source code for tooluniverse.interproscan_tool

# interproscan_tool.py
"""
InterProScan API tool for ToolUniverse.

InterProScan is EMBL-EBI's tool for scanning protein sequences against
the InterPro database to identify functional domains, families, and sites.

Unlike the InterPro lookup tools (which query pre-computed annotations),
InterProScan runs actual sequence analysis for novel/uncharacterized proteins.

API Documentation: https://www.ebi.ac.uk/Tools/services/rest/iprscan5/
"""

import requests
import time
from typing import Dict, Any, Optional
from .base_tool import BaseTool
from .tool_registry import register_tool

# Base URL for InterProScan REST API
INTERPROSCAN_BASE_URL = "https://www.ebi.ac.uk/Tools/services/rest/iprscan5"


[docs] @register_tool("InterProScanTool") class InterProScanTool(BaseTool): """ Tool for running InterProScan sequence analysis via EBI REST API. Provides protein domain/family prediction by scanning sequences against: - Pfam - PRINTS - ProSite - SMART - Gene3D - TIGRFAM - SUPERFAMILY - CDD - PANTHER Job-based API: Submit sequence, poll for results. Max 100 sequences per request. Results available for 7 days. """ # Polling configuration MAX_POLL_ATTEMPTS = 60 # ~2 minutes with 2s interval POLL_INTERVAL = 2 # seconds
[docs] def __init__(self, tool_config: Dict[str, Any]): super().__init__(tool_config) self.timeout = tool_config.get("timeout", 30) self.operation = tool_config.get("fields", {}).get("operation", "scan_sequence")
[docs] def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Execute the InterProScan API call.""" operation = self.operation if operation == "scan_sequence": return self._scan_sequence(arguments) elif operation == "get_job_status": return self._get_job_status(arguments) elif operation == "get_job_results": return self._get_job_results(arguments) else: return {"status": "error", "error": f"Unknown operation: {operation}"}
[docs] def _scan_sequence(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """ Submit a protein sequence for InterProScan analysis. Submits the job and polls for results (up to 2 minutes). For longer jobs, use get_job_status and get_job_results. """ sequence = arguments.get("sequence") email = arguments.get("email", "tooluniverse@example.com") title = arguments.get("title", "InterProScan job") go_terms = arguments.get("go_terms", True) pathways = arguments.get("pathways", True) if not sequence: return {"status": "error", "error": "sequence parameter is required"} # Validate sequence (basic check) clean_seq = sequence.replace(" ", "").replace("\n", "").upper() if not all(c in "ACDEFGHIKLMNPQRSTVWXY*" for c in clean_seq): return { "status": "error", "error": "Invalid protein sequence. Use single-letter amino acid codes.", } try: # Submit job submit_url = f"{INTERPROSCAN_BASE_URL}/run" data = { "email": email, "title": title, "sequence": clean_seq, "goterms": str(go_terms).lower(), "pathways": str(pathways).lower(), } response = requests.post(submit_url, data=data, timeout=self.timeout) if response.status_code != 200: return { "status": "error", "error": f"Job submission failed: {response.status_code} - {response.text}", } job_id = response.text.strip() # Poll for results for attempt in range(self.MAX_POLL_ATTEMPTS): status_url = f"{INTERPROSCAN_BASE_URL}/status/{job_id}" status_response = requests.get(status_url, timeout=self.timeout) status = status_response.text.strip() if status == "FINISHED": # Get results return self._fetch_results(job_id) elif status == "FAILURE": return { "status": "error", "error": "InterProScan job failed", "job_id": job_id, } elif status == "ERROR": return { "status": "error", "error": "InterProScan encountered an error", "job_id": job_id, } elif status in ["RUNNING", "PENDING", "QUEUED"]: time.sleep(self.POLL_INTERVAL) else: # Unknown status time.sleep(self.POLL_INTERVAL) # Timeout - return job_id for later retrieval return { "status": "success", "data": { "job_id": job_id, "status": "RUNNING", "message": "Job is still running. Use get_job_results with this job_id to retrieve results later.", }, } except requests.exceptions.Timeout: return { "status": "error", "error": f"InterProScan API timeout after {self.timeout}s", } except requests.exceptions.RequestException as e: return { "status": "error", "error": f"InterProScan API request failed: {str(e)}", } except Exception as e: return {"status": "error", "error": f"Unexpected error: {str(e)}"}
[docs] def _fetch_results(self, job_id: str) -> Dict[str, Any]: """Fetch and parse InterProScan results.""" try: # Get JSON results results_url = f"{INTERPROSCAN_BASE_URL}/result/{job_id}/json" response = requests.get(results_url, timeout=self.timeout) response.raise_for_status() data = response.json() # Parse results results = data.get("results", []) domains = [] go_annotations = [] pathways = [] for result in results: matches = result.get("matches", []) for match in matches: signature = match.get("signature", {}) entry = signature.get("entry", {}) domain_info = { "accession": signature.get("accession"), "name": signature.get("name"), "description": signature.get("description"), "database": signature.get("signatureLibraryRelease", {}).get( "library" ), "interpro_accession": entry.get("accession") if entry else None, "interpro_name": entry.get("name") if entry else None, "interpro_type": entry.get("type") if entry else None, "locations": [], } # Parse match locations for location in match.get("locations", []): domain_info["locations"].append( { "start": location.get("start"), "end": location.get("end"), "score": location.get("score"), "evalue": location.get("evalue"), } ) domains.append(domain_info) # Extract GO terms if present if entry: for go_term in entry.get("goXRefs", []): go_annotations.append( { "id": go_term.get("id"), "name": go_term.get("name"), "category": go_term.get("category"), } ) # Extract pathway info for pathway in entry.get("pathwayXRefs", []): pathways.append( { "database": pathway.get("databaseName"), "id": pathway.get("id"), "name": pathway.get("name"), } ) return { "status": "success", "data": { "job_id": job_id, "domains": domains, "domain_count": len(domains), "go_annotations": list( {go["id"]: go for go in go_annotations}.values() ), # Dedupe "pathways": list({p["id"]: p for p in pathways}.values()), # Dedupe "sequence_length": results[0].get("sequenceLength") if results else None, }, } except Exception as e: return {"status": "error", "error": f"Failed to fetch results: {str(e)}"}
[docs] def _get_job_status(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """ Check the status of an InterProScan job. Status values: RUNNING, FINISHED, FAILURE, ERROR, NOT_FOUND """ job_id = arguments.get("job_id") if not job_id: return {"status": "error", "error": "job_id parameter is required"} try: status_url = f"{INTERPROSCAN_BASE_URL}/status/{job_id}" response = requests.get(status_url, timeout=self.timeout) job_status = response.text.strip() return { "status": "success", "data": { "job_id": job_id, "job_status": job_status, "is_finished": job_status == "FINISHED", "has_error": job_status in ["FAILURE", "ERROR", "NOT_FOUND"], }, } except requests.exceptions.RequestException as e: return {"status": "error", "error": f"Failed to get job status: {str(e)}"}
[docs] def _get_job_results(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """ Get results for a completed InterProScan job. Job results are available for 7 days after completion. """ job_id = arguments.get("job_id") if not job_id: return {"status": "error", "error": "job_id parameter is required"} try: # Check status first status_url = f"{INTERPROSCAN_BASE_URL}/status/{job_id}" response = requests.get(status_url, timeout=self.timeout) job_status = response.text.strip() if job_status != "FINISHED": return { "status": "success", "data": { "job_id": job_id, "job_status": job_status, "message": f"Job is not finished yet. Status: {job_status}", }, } return self._fetch_results(job_id) except requests.exceptions.RequestException as e: return {"status": "error", "error": f"Failed to get job results: {str(e)}"}