Source code for tooluniverse.swissdock_tool
"""SwissDock Tool - Molecular docking using AsyncPollingTool base class.
Converted to use AsyncPollingTool for cleaner code and automatic polling management.
API Documentation: https://www.swissdock.ch/command-line.php
"""
import uuid
from urllib.parse import urlencode
import requests
from typing import Any, Dict, Optional, TYPE_CHECKING
from .async_base import AsyncPollingTool
from .tool_registry import register_tool
if TYPE_CHECKING:
from .task_progress import TaskProgress
SWISSDOCK_BASE_URL = "https://swissdock.ch:8443"
[docs]
@register_tool("SwissDockTool")
class SwissDockTool(AsyncPollingTool):
"""
Tool for molecular docking using SwissDock REST API with AsyncPollingTool base class.
SwissDock performs protein-ligand docking using:
- Attracting Cavities 2.0 (default) - cavity-based docking
- AutoDock Vina - blind or targeted docking
Now uses AsyncPollingTool for automatic polling, progress reporting, and timeout management.
"""
# Default configuration
poll_interval = 5 # seconds
max_duration = 600 # 10 minutes
[docs]
def __init__(self, tool_config: Dict[str, Any]):
"""Initialize SwissDock tool with configuration."""
# Extract config
self.name = tool_config.get("name", "SwissDock_Tool")
self.description = tool_config.get("description", "SwissDock molecular docking")
self.parameter = tool_config.get("parameter", {})
# Initialize AsyncPollingTool
super().__init__()
# SwissDock-specific config
self.timeout = tool_config.get("timeout", 60)
self.operation = tool_config.get("fields", {}).get("operation", "dock_ligand")
self._tool_config = tool_config
# ========================================================================
# Helper Methods (API-specific logic)
# ========================================================================
[docs]
def _check_server_status(self) -> bool:
"""Check if SwissDock server is operational (synchronous)."""
try:
response = requests.get(f"{SWISSDOCK_BASE_URL}/", timeout=10.0)
return response.status_code == 200 and "Hello World!" in response.text
except Exception:
return False
[docs]
def _generate_session_id(self) -> str:
"""Generate a unique session ID for the docking job."""
return str(uuid.uuid4())
[docs]
def _prepare_ligand(self, session_id: str, ligand_smiles: str):
"""Prepare ligand from SMILES (raises on error)."""
url = f"{SWISSDOCK_BASE_URL}/preplig"
params = {"mySMILES": ligand_smiles}
response = requests.get(url, params=params, timeout=self.timeout)
if response.status_code != 200:
raise RuntimeError(
f"Ligand preparation failed: HTTP {response.status_code}"
)
[docs]
def _prepare_target(self, session_id: str, pdb_id: str):
"""Prepare target protein from PDB ID (raises on error)."""
url = f"{SWISSDOCK_BASE_URL}/preptarget"
params = {"sessionNumber": session_id}
data = {"pdbid": pdb_id}
response = requests.post(url, params=params, data=data, timeout=self.timeout)
if response.status_code != 200:
raise RuntimeError(
f"Target preparation failed: HTTP {response.status_code}"
)
[docs]
def _set_docking_parameters(
self,
session_id: str,
exhaustiveness: int = 8,
box_center: Optional[str] = None,
box_size: Optional[str] = None,
docking_engine: str = "attracting_cavities",
):
"""Set docking parameters (raises on error)."""
url = f"{SWISSDOCK_BASE_URL}/setparameters"
params = {"sessionNumber": session_id, "exhaust": exhaustiveness}
# Add optional parameters
if box_center:
params["boxCenter"] = box_center
if box_size:
params["boxSize"] = box_size
if docking_engine.lower() == "vina":
params["Vina"] = "true"
response = requests.get(url, params=params, timeout=self.timeout)
if response.status_code != 200:
raise RuntimeError(f"Parameter setting failed: HTTP {response.status_code}")
[docs]
def _start_docking(self, session_id: str):
"""Start the docking job (raises on error)."""
url = f"{SWISSDOCK_BASE_URL}/startdock"
params = {"sessionNumber": session_id}
response = requests.get(url, params=params, timeout=self.timeout)
if response.status_code != 200:
raise RuntimeError(f"Docking start failed: HTTP {response.status_code}")
# Maps keywords found in status text to canonical status values.
_STATUS_KEYWORD_MAP = [
(("COMPLETE", "FINISHED", "DONE"), "FINISHED"),
(("RUNNING", "PROGRESS"), "RUNNING"),
(("ERROR", "FAIL"), "ERROR"),
]
[docs]
def _check_status_api(self, session_id: str) -> Dict[str, Any]:
"""Check docking job status (returns status dict)."""
url = f"{SWISSDOCK_BASE_URL}/checkstatus"
params = {"sessionNumber": session_id}
try:
response = requests.get(url, params=params, timeout=self.timeout)
if response.status_code == 404:
return {"status": "NOT_FOUND"}
if response.status_code != 200:
return {"status": "ERROR", "error": f"HTTP {response.status_code}"}
status_text = response.text.strip().upper()
for keywords, canonical_status in self._STATUS_KEYWORD_MAP:
if any(kw in status_text for kw in keywords):
return {"status": canonical_status}
# Assume still running if status text is unrecognized
return {"status": "RUNNING"}
except Exception as e:
return {"status": "ERROR", "error": str(e)}
[docs]
def _retrieve_results(self, session_id: str) -> Dict[str, Any]:
"""Retrieve docking results (raises on error)."""
url = f"{SWISSDOCK_BASE_URL}/retrievesession"
params = {"sessionNumber": session_id}
response = requests.get(url, params=params, timeout=60.0)
if response.status_code == 404:
raise RuntimeError("Session not found. Results may have expired.")
elif response.status_code != 200:
raise RuntimeError(f"Result retrieval failed: HTTP {response.status_code}")
return {
"session_id": session_id,
"download_url": url + "?" + urlencode(params),
"result_size_bytes": len(response.content),
"content_type": response.headers.get("Content-Type"),
"message": "Docking completed successfully. Use download_url to retrieve result files.",
}
# ========================================================================
# AsyncPollingTool Required Methods
# ========================================================================
[docs]
def submit_job(self, arguments: Dict[str, Any]) -> str:
"""
Submit docking job through multi-step workflow.
This handles the complete SwissDock workflow:
1. Check server status
2. Prepare ligand from SMILES
3. Prepare target protein
4. Set docking parameters
5. Start docking job
Returns session_id for polling.
"""
# Check server
if not self._check_server_status():
raise RuntimeError(
"SwissDock server is not responding. Please try again later."
)
# Validate required parameters
ligand_smiles = arguments.get("ligand_smiles")
pdb_id = arguments.get("pdb_id")
if not ligand_smiles:
raise ValueError("ligand_smiles parameter is required")
if not pdb_id:
raise ValueError("pdb_id parameter is required")
# Validate PDB ID format
if not isinstance(pdb_id, str) or len(pdb_id) != 4:
raise ValueError("pdb_id must be a 4-character PDB code (e.g., '1ATP')")
# Extract optional parameters
exhaustiveness = arguments.get("exhaustiveness", 8)
box_center = arguments.get("box_center")
box_size = arguments.get("box_size")
docking_engine = arguments.get("docking_engine", "attracting_cavities")
# Convert box formats if needed
if box_center and "," in box_center:
box_center = box_center.replace(",", "_")
if box_size and "," in box_size:
box_size = box_size.replace(",", "_")
# Generate session ID
session_id = self._generate_session_id()
# Execute multi-step workflow (each method raises on error)
self._prepare_ligand(session_id, ligand_smiles)
self._prepare_target(session_id, pdb_id)
self._set_docking_parameters(
session_id, exhaustiveness, box_center, box_size, docking_engine
)
self._start_docking(session_id)
return session_id
[docs]
def check_status(self, job_id: str) -> Dict[str, Any]:
"""
Check SwissDock job status and retrieve results if complete.
Args:
job_id: Session ID from submit_job()
Returns:
Dict with keys:
- done (bool): True if complete
- result (any): Results if done
- progress (int): Progress percentage
- error (str): Error message if failed
"""
status_result = self._check_status_api(job_id)
job_status = status_result["status"]
if job_status == "FINISHED":
try:
results = self._retrieve_results(job_id)
return {"done": True, "result": results, "progress": 100}
except Exception as e:
return {"done": False, "error": f"Failed to retrieve results: {e}"}
if job_status == "ERROR":
error_msg = status_result.get("error", "Unknown error")
return {"done": False, "error": f"Docking job failed: {error_msg}"}
if job_status == "NOT_FOUND":
return {"done": False, "error": "Docking session not found"}
# RUNNING or unknown status
return {"done": False, "progress": 50}
[docs]
def format_result(self, result: Any) -> Dict[str, Any]:
"""Format SwissDock results into standard response format."""
return {
"data": result,
"metadata": {
"tool": self.name,
"docking_engine": "SwissDock",
},
}
# ========================================================================
# Override run() for operation routing
# ========================================================================
_OPERATION_HANDLERS = {
"check_job_status": "_check_job_status_operation",
"retrieve_results": "_retrieve_results_operation",
}
[docs]
async def run(
self, arguments: Dict[str, Any], progress: Optional["TaskProgress"] = None
) -> Dict[str, Any]:
"""
Execute the SwissDock API call.
Routes to appropriate operation handler based on tool configuration.
"""
if self.operation == "dock_ligand":
return await super().run(arguments, progress)
handler_name = self._OPERATION_HANDLERS.get(self.operation)
if handler_name:
return await getattr(self, handler_name)(arguments)
return {"error": f"Unknown operation: {self.operation}"}
[docs]
async def _check_job_status_operation(
self, arguments: Dict[str, Any]
) -> Dict[str, Any]:
"""Check the status of a docking job by session ID (instant operation)."""
session_id = arguments.get("session_id")
if not session_id:
return {"error": "session_id parameter is required"}
status_result = self._check_status_api(session_id)
job_status = status_result["status"]
return {
"data": {
"session_id": session_id,
"job_status": job_status,
"is_finished": job_status == "FINISHED",
"has_error": job_status in ["ERROR", "NOT_FOUND"],
"error": status_result.get("error"),
}
}
[docs]
async def _retrieve_results_operation(
self, arguments: Dict[str, Any]
) -> Dict[str, Any]:
"""Retrieve results for a completed docking job (instant operation)."""
session_id = arguments.get("session_id")
if not session_id:
return {"error": "session_id parameter is required"}
# Check status first
status_result = self._check_status_api(session_id)
job_status = status_result["status"]
if job_status != "FINISHED":
return {
"data": {
"session_id": session_id,
"job_status": job_status,
"message": f"Job is not finished yet. Status: {job_status}",
}
}
# Retrieve results
try:
results = self._retrieve_results(session_id)
return {"data": results}
except Exception as e:
return {"error": f"Failed to retrieve results: {e}"}