Source code for tooluniverse.ebi_alignment_tool
"""
Sequence alignment and phylogeny tools for ToolUniverse (EMBL-EBI Job Dispatcher).
ToolUniverse could fetch pre-computed alignments/trees (Rfam, Ensembl Compara)
but had no way to align a user's *own* sequences or build a tree from them. These
two tools close that gap using EMBL-EBI's public Job Dispatcher REST services
(the same submit -> poll -> result pattern used by InterProScan):
- EBI_msa_align multiple sequence alignment (Clustal Omega / MUSCLE
/ MAFFT / Kalign / T-Coffee)
- EBI_build_phylogenetic_tree neighbour-joining / UPGMA tree from an alignment
Public, no authentication. API: https://www.ebi.ac.uk/Tools/services/rest/
"""
import re
import time
import requests
from typing import Dict, Any, List, Optional, Tuple
from .base_tool import BaseTool
from .tool_registry import register_tool
_EBI_BASE = "https://www.ebi.ac.uk/Tools/services/rest"
_DEFAULT_EMAIL = "tooluniverse@example.com"
# Per-method service config. EBI MSA services do not share a parameter schema:
# the output-format parameter is "outfmt" for Clustal Omega but "format" for the
# others, and MUSCLE auto-detects sequence type (no "stype").
_MSA_METHODS = {
"clustalo": {"format_param": "outfmt", "fasta_value": "fa", "stype": True},
"muscle": {"format_param": "format", "fasta_value": "fasta", "stype": False},
"mafft": {"format_param": "format", "fasta_value": "fasta", "stype": True},
"kalign": {"format_param": "format", "fasta_value": "fasta", "stype": True},
"tcoffee": {"format_param": "format", "fasta_value": "fasta", "stype": True},
}
# Poll up to ~2.5 min total; each HTTP call still honours the 30s tool timeout.
_MAX_POLL_ATTEMPTS = 50
_POLL_INTERVAL = 3
def _submit(
service: str, params: Dict[str, Any], timeout: int
) -> Tuple[Optional[str], Optional[str]]:
"""POST a job to an EBI service. Returns (job_id, error)."""
resp = requests.post(f"{_EBI_BASE}/{service}/run", data=params, timeout=timeout)
if resp.status_code != 200:
return (
None,
f"{service} submission failed (HTTP {resp.status_code}): {resp.text[:200]}",
)
return resp.text.strip(), None
def _poll(
service: str, job_id: str, timeout: int
) -> Tuple[Optional[str], Optional[str]]:
"""Poll a job to completion. Returns (final_status, error)."""
for _ in range(_MAX_POLL_ATTEMPTS):
resp = requests.get(f"{_EBI_BASE}/{service}/status/{job_id}", timeout=timeout)
status = resp.text.strip()
if status == "FINISHED":
return status, None
if status in ("ERROR", "FAILURE", "NOT_FOUND"):
return status, f"{service} job {status.lower()} (job {job_id})"
time.sleep(_POLL_INTERVAL)
return (
None,
f"{service} job did not finish within {_MAX_POLL_ATTEMPTS * _POLL_INTERVAL}s (job {job_id})",
)
def _result_types(service: str, job_id: str, timeout: int) -> List[str]:
resp = requests.get(f"{_EBI_BASE}/{service}/resulttypes/{job_id}", timeout=timeout)
return re.findall(r"<identifier>([^<]+)</identifier>", resp.text)
def _result(service: str, job_id: str, rtype: str, timeout: int) -> str:
resp = requests.get(
f"{_EBI_BASE}/{service}/result/{job_id}/{rtype}", timeout=timeout
)
resp.raise_for_status()
return resp.text
def _count_fasta_records(text: str) -> int:
return len(re.findall(r"^>", text, flags=re.MULTILINE))
def _guarded_run(
label: str, timeout: int, run_fn, arguments: Dict[str, Any]
) -> Dict[str, Any]:
"""Call run_fn, converting any exception into a {"status": "error"} envelope.
Tools must never raise, so every run() routes through here for uniform
handling of timeouts, connection failures, and unexpected errors.
"""
try:
return run_fn(arguments)
except requests.exceptions.Timeout:
return {
"status": "error",
"error": f"EBI {label} request timed out after {timeout}s",
}
except requests.exceptions.ConnectionError:
return {
"status": "error",
"error": "Failed to connect to EBI Job Dispatcher",
}
except Exception as e: # noqa: BLE001 - tools must never raise
return {"status": "error", "error": f"EBI {label} error: {str(e)}"}
[docs]
@register_tool("EBIMSATool")
class EBIMSATool(BaseTool):
"""Multiple sequence alignment of user-provided sequences via EMBL-EBI.
Submits a FASTA set (>=2 records) to a Job Dispatcher MSA service, polls to
completion, and returns the alignment in FASTA and the tool's native format,
plus a guide tree when the method emits one.
"""
[docs]
def __init__(self, tool_config: Dict[str, Any]):
super().__init__(tool_config)
self.timeout = tool_config.get("timeout", 30)
[docs]
def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
return _guarded_run("MSA", self.timeout, self._run, arguments)
[docs]
def _run(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
sequences = (arguments.get("sequences") or "").strip()
if not sequences:
return {
"status": "error",
"error": "sequences is required (FASTA, >=2 records)",
}
n = _count_fasta_records(sequences)
if n < 2:
return {
"status": "error",
"error": f"MSA needs >=2 FASTA records; found {n}. Provide multiple '>'-prefixed sequences.",
}
method = (arguments.get("method") or "clustalo").strip().lower()
cfg = _MSA_METHODS.get(method)
if cfg is None:
return {
"status": "error",
"error": f"Unknown method '{method}'. Choose one of: {', '.join(_MSA_METHODS)}",
}
seq_type = (arguments.get("sequence_type") or "protein").strip().lower()
if seq_type not in ("protein", "dna", "rna"):
return {
"status": "error",
"error": "sequence_type must be protein, dna, or rna",
}
params = {
"email": arguments.get("email", _DEFAULT_EMAIL),
"sequence": sequences,
cfg["format_param"]: cfg["fasta_value"],
}
if cfg["stype"]:
# MAFFT/Kalign use "dna" for nucleotide; treat rna as dna for alignment.
params["stype"] = "protein" if seq_type == "protein" else "dna"
job_id, err = _submit(method, params, self.timeout)
if err:
return {"status": "error", "error": err}
_, err = _poll(method, job_id, self.timeout)
if err:
return {"status": "error", "error": err}
rtypes = _result_types(method, job_id, self.timeout)
aligned_fasta = self._fetch_first(
method, job_id, rtypes, ("fa", "aln-fasta", "fasta")
)
clustal = self._fetch_first(
method, job_id, rtypes, ("aln-clustal_num", "aln-clustal", "clustal", "out")
)
guide_tree = self._fetch_first(method, job_id, rtypes, ("phylotree", "tree"))
return {
"status": "success",
"data": {
"method": method,
"num_sequences": n,
"aligned_fasta": aligned_fasta,
"alignment_clustal": clustal,
"guide_tree_newick": (guide_tree or "").strip() or None,
},
"metadata": {
"source": f"EMBL-EBI Job Dispatcher ({method})",
"sequence_type": seq_type,
"job_id": job_id,
"result_url": f"{_EBI_BASE}/{method}/result/{job_id}",
},
}
[docs]
def _fetch_first(
self,
service: str,
job_id: str,
available: List[str],
preferred: Tuple[str, ...],
) -> Optional[str]:
"""Fetch the first preferred result type that the job actually produced."""
for rtype in preferred:
if rtype in available:
return _result(service, job_id, rtype, self.timeout)
return None
[docs]
@register_tool("EBIPhylogenyTool")
class EBIPhylogenyTool(BaseTool):
"""Build a phylogenetic tree from an alignment via EMBL-EBI simple_phylogeny.
Takes an aligned FASTA (e.g. the output of EBI_msa_align) and returns a
Newick tree by neighbour-joining or UPGMA.
"""
SERVICE = "simple_phylogeny"
[docs]
def __init__(self, tool_config: Dict[str, Any]):
super().__init__(tool_config)
self.timeout = tool_config.get("timeout", 30)
[docs]
def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
return _guarded_run("phylogeny", self.timeout, self._run, arguments)
[docs]
def _run(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
aligned = (arguments.get("aligned_sequences") or "").strip()
if not aligned:
return {
"status": "error",
"error": "aligned_sequences is required (aligned FASTA, >=2 records, e.g. from EBI_msa_align)",
}
n = _count_fasta_records(aligned)
if n < 2:
return {
"status": "error",
"error": f"A tree needs >=2 aligned records; found {n}.",
}
clustering = (arguments.get("clustering") or "Neighbour-joining").strip()
if clustering.lower() in ("nj", "neighbour-joining", "neighbor-joining"):
clustering = "Neighbour-joining"
elif clustering.lower() == "upgma":
clustering = "UPGMA"
else:
return {
"status": "error",
"error": "clustering must be 'Neighbour-joining' or 'UPGMA'",
}
params = {
"email": arguments.get("email", _DEFAULT_EMAIL),
"sequence": aligned,
"tree": "phylip",
"clustering": clustering,
"kimura": str(bool(arguments.get("distance_correction", False))).lower(),
"tossgaps": str(bool(arguments.get("exclude_gaps", False))).lower(),
}
job_id, err = _submit(self.SERVICE, params, self.timeout)
if err:
return {"status": "error", "error": err}
_, err = _poll(self.SERVICE, job_id, self.timeout)
if err:
return {"status": "error", "error": err}
newick = _result(self.SERVICE, job_id, "tree", self.timeout).strip()
return {
"status": "success",
"data": {
"newick": newick,
"num_taxa": n,
"clustering": clustering,
},
"metadata": {
"source": "EMBL-EBI Job Dispatcher (simple_phylogeny)",
"distance_correction": "Kimura"
if params["kimura"] == "true"
else "none",
"exclude_gap_columns": params["tossgaps"] == "true",
"job_id": job_id,
"result_url": f"{_EBI_BASE}/{self.SERVICE}/result/{job_id}",
},
}