Source code for tooluniverse.uniprot_idmapping_tool
# uniprot_idmapping_tool.py
"""
UniProt ID Mapping Service tool for ToolUniverse.
Provides cross-database protein/gene identifier conversion using the
canonical UniProt ID Mapping REST API. Supports 100+ databases including
UniProtKB, Gene Names, Ensembl, RefSeq, PDB, ChEMBL, and more.
The service is asynchronous: a mapping job is submitted via POST,
its status is polled, and results are retrieved when complete.
API: https://rest.uniprot.org/idmapping/
No authentication required.
"""
import time
import requests
from typing import Dict, Any
from .base_tool import BaseTool
from .tool_registry import register_tool
UNIPROT_BASE_URL = "https://rest.uniprot.org"
[docs]
@register_tool("UniProtIDMappingTool")
class UniProtIDMappingTool(BaseTool):
"""
Tool for converting identifiers between databases using
the UniProt ID Mapping service.
Handles the async submit -> poll -> results workflow automatically.
No authentication required.
"""
[docs]
def __init__(self, tool_config: Dict[str, Any]):
super().__init__(tool_config)
self.timeout = tool_config.get("timeout", 30)
self.endpoint_type = tool_config.get("fields", {}).get(
"endpoint_type", "convert"
)
[docs]
def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute the UniProt ID Mapping API call."""
try:
return self._dispatch(arguments)
except requests.exceptions.Timeout:
return {"error": f"UniProt ID Mapping API timed out after {self.timeout}s"}
except requests.exceptions.ConnectionError:
return {"error": "Failed to connect to UniProt ID Mapping API"}
except requests.exceptions.HTTPError as e:
status = e.response.status_code if e.response else "unknown"
body = ""
try:
body = e.response.json().get("messages", [""])[0]
except Exception:
pass
return {"error": f"UniProt ID Mapping HTTP error {status}: {body}"}
except Exception as e:
return {"error": f"Unexpected error: {str(e)}"}
[docs]
def _dispatch(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Route to appropriate endpoint."""
if self.endpoint_type == "convert":
return self._convert(arguments)
elif self.endpoint_type == "to_pdb":
return self._to_pdb(arguments)
elif self.endpoint_type == "gene_to_uniprot":
return self._gene_to_uniprot(arguments)
elif self.endpoint_type == "list_databases":
return self._list_databases(arguments)
return {"error": f"Unknown endpoint_type: {self.endpoint_type}"}
[docs]
def _submit_and_poll(
self, from_db: str, to_db: str, ids: str, tax_id: int = None
) -> Dict:
"""Submit a mapping job and poll for results."""
# Submit job
data = {
"from": from_db,
"to": to_db,
"ids": ids,
}
if tax_id:
data["taxId"] = tax_id
submit_resp = requests.post(
f"{UNIPROT_BASE_URL}/idmapping/run",
data=data,
timeout=self.timeout,
)
submit_resp.raise_for_status()
job_id = submit_resp.json().get("jobId")
if not job_id:
return {"error": "Failed to get job ID from UniProt ID Mapping"}
# Poll for completion (max 60 seconds)
max_polls = 20
for _ in range(max_polls):
status_resp = requests.get(
f"{UNIPROT_BASE_URL}/idmapping/status/{job_id}",
timeout=self.timeout,
)
status_data = status_resp.json()
if status_data.get("jobStatus") == "FINISHED":
break
if "results" in status_data:
# Some responses include results directly
return {"results": status_data["results"], "job_id": job_id}
if status_data.get("jobStatus") == "ERROR":
msg = status_data.get("errorMessage", "Unknown error")
return {"error": f"UniProt mapping job failed: {msg}"}
time.sleep(1.5)
else:
return {"error": "UniProt ID mapping job did not complete within timeout"}
# Get results
results_resp = requests.get(
f"{UNIPROT_BASE_URL}/idmapping/results/{job_id}",
params={"size": 500},
timeout=self.timeout,
)
results_resp.raise_for_status()
results_data = results_resp.json()
return {"results": results_data.get("results", []), "job_id": job_id}
[docs]
def _convert(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Generic ID conversion between any supported databases."""
ids = arguments.get("ids", "")
from_db = arguments.get("from_db", "")
to_db = arguments.get("to_db", "UniProtKB")
tax_id = arguments.get("tax_id")
if not ids:
return {"error": "ids parameter is required (e.g., 'TP53,BRCA1')"}
if not from_db:
return {"error": "from_db parameter is required (e.g., 'Gene_Name')"}
result = self._submit_and_poll(from_db, to_db, ids, tax_id)
if "error" in result:
return result
raw_results = result.get("results", [])
# Parse results - handle both simple and complex formats
parsed = []
for r in raw_results:
to_val = r.get("to", "")
# Some results have nested objects for 'to'
if isinstance(to_val, dict):
to_val = to_val.get("primaryAccession", to_val.get("id", str(to_val)))
parsed.append({"from": r.get("from", ""), "to": str(to_val)})
return {
"data": {
"from_db": from_db,
"to_db": to_db,
"result_count": len(parsed),
"results": parsed[:500],
"failed_ids": [],
},
"metadata": {
"source": "UniProt ID Mapping Service",
"job_id": result.get("job_id", ""),
"endpoint": "idmapping",
},
}
[docs]
def _to_pdb(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Convert UniProt accessions to PDB IDs."""
uniprot_ids = arguments.get("uniprot_ids", "")
if not uniprot_ids:
return {"error": "uniprot_ids is required (e.g., 'P04637')"}
result = self._submit_and_poll("UniProtKB_AC-ID", "PDB", uniprot_ids)
if "error" in result:
return result
raw_results = result.get("results", [])
parsed = [
{"from": r.get("from", ""), "to": str(r.get("to", ""))} for r in raw_results
]
return {
"data": {
"query_ids": uniprot_ids,
"result_count": len(parsed),
"results": parsed[:500],
},
"metadata": {
"source": "UniProt ID Mapping Service",
"endpoint": "idmapping (UniProtKB_AC-ID -> PDB)",
},
}
[docs]
def _gene_to_uniprot(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Convert gene names to UniProt accessions."""
gene_names = arguments.get("gene_names", "")
tax_id = arguments.get("tax_id", 9606)
reviewed_only = arguments.get("reviewed_only", False)
if not gene_names:
return {"error": "gene_names is required (e.g., 'TP53,BRCA1')"}
to_db = "UniProtKB-Swiss-Prot" if reviewed_only else "UniProtKB"
result = self._submit_and_poll("Gene_Name", to_db, gene_names, tax_id)
if "error" in result:
return result
raw_results = result.get("results", [])
parsed = []
for r in raw_results:
to_val = r.get("to", "")
if isinstance(to_val, dict):
to_val = to_val.get("primaryAccession", str(to_val))
parsed.append({"from": r.get("from", ""), "to": str(to_val)})
return {
"data": {
"gene_names": gene_names,
"species_taxid": tax_id,
"result_count": len(parsed),
"results": parsed[:500],
},
"metadata": {
"source": "UniProt ID Mapping Service",
"endpoint": f"idmapping (Gene_Name -> {to_db})",
},
}
[docs]
def _list_databases(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""List available databases for ID mapping."""
url = f"{UNIPROT_BASE_URL}/configure/idmapping/fields"
response = requests.get(url, timeout=self.timeout)
response.raise_for_status()
raw = response.json()
groups_raw = raw.get("groups", [])
groups = []
for g in groups_raw:
dbs = []
for item in g.get("items", []):
dbs.append(
{
"name": item.get("name", ""),
"display_name": item.get("displayName", ""),
"from_supported": item.get("from", False),
}
)
groups.append(
{
"group_name": g.get("groupName", ""),
"databases": dbs,
}
)
return {
"data": {
"group_count": len(groups),
"groups": groups,
},
"metadata": {
"source": "UniProt ID Mapping Service",
"endpoint": "configure/idmapping/fields",
},
}