Creating Async Tools with AsyncPollingTool¶
Overview¶
Many scientific operations take minutes or hours to complete—protein docking, molecular dynamics simulations, large-scale data processing. ToolUniverse provides the AsyncPollingTool base class to handle these long-running operations elegantly:
Automatic polling - No manual loops needed Progress reporting - Built-in status updates Non-blocking - Server remains responsive MCP Tasks compatible - Works with Model Context Protocol Consistent patterns - All async tools behave the same way
When to Use AsyncPollingTool¶
Use AsyncPollingTool when your tool:
Takes >30 seconds to complete Polls external APIs for job status Returns job IDs that require status checking Has multiple processing stages (submit → poll → retrieve)
- Examples:
Protein structure prediction (5-60 minutes)
Molecular docking (10-30 minutes)
Large-scale sequence alignment (minutes to hours)
Complex simulations (hours to days)
- Don’t use for:
Quick API calls (<30 seconds)
Synchronous operations
Database queries
Simple lookups
Quick Start¶
Here’s a minimal async tool:
from tooluniverse.async_base import AsyncPollingTool
from tooluniverse.tool_registry import register_tool
import requests
from typing import Dict, Any
@register_tool("MyAsyncTool")
class MyAsyncTool(AsyncPollingTool):
"""Tool for long-running operations."""
# Configuration
poll_interval = 10 # seconds between status checks
max_duration = 1800 # 30 minutes maximum
def __init__(self, tool_config: Dict[str, Any]):
self.name = tool_config["name"]
self.api_url = "https://api.example.com"
super().__init__() # Must call after setting name
def submit_job(self, arguments: Dict[str, Any]) -> str:
"""Submit job and return job ID."""
response = requests.post(
f"{self.api_url}/submit",
json=arguments
)
return response.json()["job_id"]
def check_status(self, job_id: str) -> Dict[str, Any]:
"""Check job status and return result if done."""
response = requests.get(
f"{self.api_url}/status/{job_id}"
)
data = response.json()
if data["status"] == "completed":
return {"done": True, "result": data["result"]}
elif data["status"] == "failed":
return {"done": False, "error": data["error"]}
else:
return {"done": False, "progress": 50}
That’s it! The base class handles polling, timeouts, and progress reporting.
How It Works¶
The Workflow¶
User calls tool
↓
1. submit_job()
- Submit to external API
- Return job_id
↓
2. Automatic polling loop (handled by base class)
- Call check_status(job_id) every poll_interval seconds
- Report progress if available
- Continue until done=True or timeout
↓
3. format_result()
- Format final result
- Return to user
Your responsibilities: Implement 2-3 methods Base class handles: Polling loop, timeouts, progress, error handling
What You Implement¶
Required Methods¶
submit_job(arguments) → job_id
Submit the job to the external service.
def submit_job(self, arguments: Dict[str, Any]) -> str: """Submit job and return job ID for polling.""" # Validate arguments if "required_param" not in arguments: raise ValueError("Missing required_param") # Call external API response = requests.post( f"{self.api_url}/jobs", json=arguments, timeout=60 ) # Extract job ID job_id = response.json()["id"] return job_id
check_status(job_id) → status dict
Check if job is complete and return results.
def check_status(self, job_id: str) -> Dict[str, Any]: """Check status and return done/result/error/progress.""" response = requests.get( f"{self.api_url}/jobs/{job_id}/status" ) data = response.json() # Job completed successfully if data["status"] == "completed": return { "done": True, "result": data["output"], "progress": 100 } # Job failed if data["status"] == "failed": return { "done": False, "error": data["error_message"] } # Still running return { "done": False, "progress": data.get("percent_complete", 50) }
Optional Methods¶
format_result(result) → formatted dict
Format the final result (optional, has default).
def format_result(self, result: Any) -> Dict[str, Any]: """Format result into standard ToolUniverse format.""" return { "data": result, "metadata": { "tool": self.name, "timestamp": datetime.now().isoformat() } }
What the Base Class Provides¶
Automatic polling loop - Calls
check_status()repeatedly Non-blocking async - Usesawait asyncio.sleep()Timeout handling - Stops aftermax_durationseconds Progress reporting - Updates viaTaskProgressError handling - Catches exceptions, returns error dicts Return schema - Auto-generates oneOf structure run() method - Orchestrates the entire workflow
Configuration¶
Class Attributes¶
Set these in your tool class:
class MyAsyncTool(AsyncPollingTool):
# Polling configuration
poll_interval = 15 # Seconds between status checks (default: 10)
max_duration = 3600 # Maximum wait time in seconds (default: 1800)
# Optional: Custom return schema (usually auto-generated)
return_schema = {
"oneOf": [
{"properties": {"data": {...}, "metadata": {...}}},
{"properties": {"error": {...}}}
]
}
JSON Configuration¶
In your *_tools.json file:
{
"type": "MyAsyncTool",
"name": "MyTool_analyze_data",
"description": "Analyze large dataset (may take 10-30 minutes)",
"parameter": {
"type": "object",
"properties": {
"dataset_id": {
"type": "string",
"description": "ID of dataset to analyze"
}
},
"required": ["dataset_id"]
},
"fields": {
"is_async": true,
"poll_interval": 20,
"max_wait_time": 1800
}
}
Real-World Examples¶
Example 1: ProteinsPlus (Simple Polling)¶
ProteinsPlus API returns a status URL that we poll:
@register_tool("ProteinsPlusRESTTool")
class ProteinsPlusRESTTool(AsyncPollingTool):
"""Protein structure analysis."""
poll_interval = 15
max_duration = 1800
def submit_job(self, arguments: Dict[str, Any]) -> str:
"""Submit PDB structure for analysis."""
pdb_id = arguments["pdb_id"]
# Build request
request_data = {
"dogsite": {
"pdbCode": pdb_id,
"analysisDetail": "1"
}
}
# Submit to ProteinsPlus
response = requests.post(
"https://proteins.plus/api/dogsite_rest",
json=request_data
)
# Return status URL as job_id
return response.json()["location"]
def check_status(self, job_id: str) -> Dict[str, Any]:
"""Check if analysis is complete."""
response = requests.get(job_id)
# HTTP 202 = still processing
if response.status_code == 202:
return {"done": False, "progress": 30}
# HTTP 200 = complete
if response.status_code == 200:
results = response.json()
return {"done": True, "result": results, "progress": 100}
# Error
return {"done": False, "error": f"HTTP {response.status_code}"}
Example 2: SwissDock (Multi-Step Workflow)¶
SwissDock requires multiple API calls before polling:
@register_tool("SwissDockTool")
class SwissDockTool(AsyncPollingTool):
"""Molecular docking simulation."""
poll_interval = 30
max_duration = 3600
def submit_job(self, arguments: Dict[str, Any]) -> str:
"""Multi-step job submission."""
# Step 1: Generate session ID
session_id = str(uuid.uuid4())
# Step 2: Prepare ligand
self._prepare_ligand(session_id, arguments["ligand_smiles"])
# Step 3: Prepare target protein
self._prepare_target(session_id, arguments["pdb_id"])
# Step 4: Set docking parameters
self._set_parameters(session_id, arguments)
# Step 5: Start docking
self._start_docking(session_id)
return session_id
def check_status(self, job_id: str) -> Dict[str, Any]:
"""Check docking progress."""
response = requests.get(
f"{self.base_url}/checkstatus",
params={"sessionNumber": job_id}
)
status = response.text.strip().upper()
if "FINISHED" in status:
results = self._retrieve_results(job_id)
return {"done": True, "result": results}
if "ERROR" in status or "FAIL" in status:
return {"done": False, "error": "Docking failed"}
# Still running
return {"done": False, "progress": 50}
def _prepare_ligand(self, session_id, smiles):
"""Helper: prepare ligand from SMILES."""
requests.get(f"{self.base_url}/preplig",
params={"mySMILES": smiles})
# ... more helper methods ...
Progress Reporting¶
Automatic Progress¶
The base class automatically reports progress through the TaskProgress system:
# Your check_status returns progress percentage:
return {"done": False, "progress": 45}
# Users see:
# "🔄 Running MyTool_analyze_data (45% complete)"
Custom Messages¶
For more detailed progress updates:
async def run(self, arguments, progress=None):
"""Override run() for custom progress messages."""
if progress:
await progress.set_message("Validating input...")
job_id = self.submit_job(arguments)
if progress:
await progress.set_message(f"Job {job_id} submitted")
# Let base class handle polling
return await super().run(arguments, progress)
Error Handling¶
In submit_job¶
Raise exceptions for validation errors:
def submit_job(self, arguments):
# Validate required parameters
if "pdb_id" not in arguments:
raise ValueError("pdb_id parameter is required")
if len(arguments["pdb_id"]) != 4:
raise ValueError("pdb_id must be 4 characters")
# Raise for API errors
response = requests.post(url, json=data)
if response.status_code == 400:
raise RuntimeError(f"API error: {response.text}")
return response.json()["job_id"]
In check_status¶
Return error dicts for job failures:
def check_status(self, job_id):
response = requests.get(f"{url}/status/{job_id}")
# Job failed on server
if response.json()["status"] == "failed":
return {
"done": False,
"error": "Job execution failed on server"
}
# Network/API error
if response.status_code == 404:
return {
"done": False,
"error": "Job not found (may have expired)"
}
MCP Tasks Integration¶
Your AsyncPollingTool automatically works with MCP Tasks protocol for non-blocking execution.
How It Works¶
MCP Client → SMCP Server → TaskManager → Your AsyncPollingTool
↓ ↓ ↓
Returns Creates Executes
taskId Background Job
immediately Task Async
- The TaskManager handles:
Creating background tasks
Polling your tool’s status
Reporting progress to client
Managing timeouts and cleanup
You don’t need to do anything special—just inherit from AsyncPollingTool!
Testing Async Tools¶
Unit Testing¶
Test components individually:
import pytest
from unittest.mock import Mock, patch
def test_submit_job():
"""Test job submission."""
tool = MyAsyncTool(config)
with patch('requests.post') as mock_post:
mock_post.return_value.json.return_value = {"job_id": "123"}
job_id = tool.submit_job({"param": "value"})
assert job_id == "123"
assert mock_post.called
def test_check_status_complete():
"""Test status check when job is done."""
tool = MyAsyncTool(config)
with patch('requests.get') as mock_get:
mock_get.return_value.json.return_value = {
"status": "completed",
"result": {"data": "output"}
}
status = tool.check_status("123")
assert status["done"] == True
assert "result" in status
def test_check_status_running():
"""Test status check while still running."""
tool = MyAsyncTool(config)
with patch('requests.get') as mock_get:
mock_get.return_value.json.return_value = {
"status": "running",
"progress": 75
}
status = tool.check_status("123")
assert status["done"] == False
assert status["progress"] == 75
Integration Testing¶
Test with real API (when available):
@pytest.mark.integration
@pytest.mark.asyncio
async def test_full_workflow():
"""Test complete async workflow."""
from tooluniverse import ToolUniverse
tu = ToolUniverse()
tu.load_tools()
# Run async tool — tu.run() is context-aware; in async context it returns a coroutine
result = await tu.run(
'{"name": "MyTool_analyze_data", "arguments": {"dataset_id": "test123"}}'
)
assert "data" in result
assert result["data"] is not None
Best Practices¶
Do’s¶
Keep submit_job lightweight - Just submit and return ID Handle all status cases - completed, failed, running, unknown Validate inputs early - Fail fast in submit_job Use appropriate intervals - 10-30s for most APIs Set realistic timeouts - Consider actual job duration Return progress when available - Better UX Use helper methods - Keep methods focused and clean Test with mocks first - Don’t hit real APIs in unit tests
Don’ts¶
Don’t do work in submit_job - Just submit to external service Don’t block in check_status - Should be a quick status check Don’t poll too frequently - Respect API rate limits (<10s is usually too much) Don’t set infinite timeouts - Always have max_duration Don’t swallow errors - Return {“error”: “…”} or raise exception Don’t use time.sleep() - Use asyncio.sleep() or let base class handle it Don’t return raw API responses - Format consistently
Common Patterns¶
Pattern 1: Status URL¶
API returns a URL to poll:
def submit_job(self, arguments):
response = requests.post(api_url, json=arguments)
return response.json()["status_url"] # URL is the job_id
def check_status(self, job_id):
response = requests.get(job_id) # job_id IS the URL
# ... check response ...
Pattern 2: Separate Endpoints¶
Different endpoints for submit and status:
def submit_job(self, arguments):
response = requests.post(f"{self.base_url}/jobs", json=arguments)
return response.json()["job_id"]
def check_status(self, job_id):
response = requests.get(f"{self.base_url}/jobs/{job_id}/status")
# ... check response ...
Pattern 3: Polling with Authentication¶
Need auth token for status checks:
def __init__(self, tool_config):
super().__init__()
self.api_key = os.getenv("API_KEY")
self._headers = {"Authorization": f"Bearer {self.api_key}"}
def submit_job(self, arguments):
response = requests.post(url, json=arguments, headers=self._headers)
return response.json()["job_id"]
def check_status(self, job_id):
response = requests.get(url, headers=self._headers)
# ... check response ...
Pattern 4: Multi-Stage Pipeline¶
Job has multiple stages:
def check_status(self, job_id):
response = requests.get(f"{url}/status/{job_id}")
data = response.json()
# Map stages to progress
stage_progress = {
"queued": 10,
"preprocessing": 25,
"processing": 50,
"postprocessing": 75,
"completed": 100
}
stage = data["current_stage"]
if stage == "completed":
return {"done": True, "result": data["output"]}
elif stage == "failed":
return {"done": False, "error": data["error"]}
else:
return {"done": False, "progress": stage_progress.get(stage, 50)}
Migration Guide¶
Converting Existing Async Tools¶
If you have an existing async tool with manual polling:
Before (286 lines with manual polling):
class OldAsyncTool(BaseTool):
def run(self, arguments):
# Submit job
job_id = self._submit_job(arguments)
# Manual polling loop (70+ lines!)
start_time = time.time()
while True:
elapsed = time.time() - start_time
if elapsed > 1800:
return {"error": "Timeout"}
status = self._check_status(job_id)
if status["done"]:
return status["result"]
time.sleep(10) # Blocks!
After (356 lines, but cleaner with automatic polling):
class NewAsyncTool(AsyncPollingTool):
poll_interval = 10
max_duration = 1800
def submit_job(self, arguments):
return self._submit_job(arguments)
def check_status(self, job_id):
return self._check_status(job_id)
- Result:
Eliminated 70+ lines of polling boilerplate
Non-blocking async execution
Automatic progress reporting
MCP Tasks compatible
Troubleshooting¶
Tool Never Completes¶
Symptoms: Tool runs forever, never returns result
- Causes:
check_status()never returnsdone=TrueWrong job_id format
API endpoint changed
Debug:
def check_status(self, job_id):
response = requests.get(f"{url}/status/{job_id}")
print(f"DEBUG: Status response: {response.json()}") # Add logging
# Make sure you return done=True at some point!
if response.json()["status"] == "completed":
return {"done": True, "result": response.json()}
Job Times Out¶
Symptoms: Tool returns timeout error
- Causes:
max_durationtoo shortJob actually takes longer than expected
API is slow
Fix:
class MyTool(AsyncPollingTool):
max_duration = 3600 # Increase timeout to 1 hour
poll_interval = 20 # Poll less frequently
Progress Not Showing¶
Symptoms: No progress updates visible
- Causes:
Not returning
progressincheck_status()Progress not changing between calls
Fix:
def check_status(self, job_id):
response = requests.get(f"{url}/status/{job_id}")
data = response.json()
# Always return progress when not done
if not data["is_complete"]:
return {
"done": False,
"progress": data.get("percent_complete", 50) # Include progress!
}
Tool Returns Error Dict¶
Symptoms: Tool returns {"error": "..."} unexpectedly
- Causes:
Exception in
submit_job()API returns error status
Network error
Debug:
def submit_job(self, arguments):
try:
response = requests.post(url, json=arguments, timeout=60)
response.raise_for_status() # Raises for 4xx/5xx
return response.json()["job_id"]
except requests.Timeout:
raise RuntimeError("API timeout during job submission")
except requests.HTTPError as e:
raise RuntimeError(f"API error: {e.response.text}")
except KeyError:
raise RuntimeError("API response missing job_id field")
API Reference¶
AsyncPollingTool Class¶
class AsyncPollingTool(ABC):
"""Base class for async tools with automatic polling.
Attributes:
poll_interval (int): Seconds between status checks (default: 10)
max_duration (int): Maximum wait time in seconds (default: 1800)
return_schema (dict): Tool return schema (auto-generated if not set)
"""
@abstractmethod
def submit_job(self, arguments: Dict[str, Any]) -> str:
"""Submit job to external service.
Args:
arguments: Tool parameters from user
Returns:
job_id: Identifier for polling (string)
Raises:
ValueError: Invalid parameters
RuntimeError: API/submission error
"""
pass
@abstractmethod
def check_status(self, job_id: str) -> Dict[str, Any]:
"""Check job status and return result if complete.
Args:
job_id: Job identifier from submit_job()
Returns:
Dictionary with:
- done (bool): True if job complete
- result (any): Final result (if done=True)
- error (str): Error message (if failed)
- progress (int): Progress percentage 0-100 (optional)
"""
pass
def format_result(self, result: Any) -> Dict[str, Any]:
"""Format final result (optional override).
Args:
result: Raw result from check_status()
Returns:
Formatted result dictionary
"""
return {
"data": result,
"metadata": {"tool": self.name}
}
async def run(
self,
arguments: Dict[str, Any],
progress: Optional["TaskProgress"] = None
) -> Dict[str, Any]:
"""Execute tool with automatic polling (do not override unless needed).
Args:
arguments: Tool parameters
progress: Optional progress reporter
Returns:
Final formatted result
"""
# Implemented by base class - handles polling automatically
Further Reading¶
Comprehensive Tool Guide - Complete tool development guide
Local Tools - Creating local tools
MCP Support - MCP integration
API Reference - API documentation
Examples¶
src/tooluniverse/proteinsplus_tool.py- Simple polling examplesrc/tooluniverse/swissdock_tool.py- Complex multi-step workflowexamples/proteinsplus_tools_example.py- Usage examplestests/test_async_base.py- Test examples
Changelog¶
v0.4.0 (2024-02): Added AsyncPollingTool base class
v0.4.1 (2024-02): Added MCP Tasks integration
v0.4.2 (2024-02): Converted ProteinsPlus and SwissDock tools
Note
AsyncPollingTool is production-ready and recommended for all new async tools. It eliminates 70-100 lines of boilerplate per tool and ensures consistent behavior.