Source code for tooluniverse.dtu_protein_tool

"""
DTU Health Tech protein predictors (local-compute via the biolib cloud runner).

Wraps three machine-learning protein predictors published by DTU Health Tech
(Technical University of Denmark) and related groups on the BioLib platform,
run through the ``pybiolib`` Python package:

  - deeptmhmm : DTU/DeepTMHMM      -> transmembrane topology (TM helices / beta
                                      barrels, signal peptide, inside/outside)
  - signalp   : DTU/SignalP_6      -> signal-peptide detection + cleavage site
  - deeploc   : KU/DeepLocPro      -> (prokaryotic) subcellular localization

NOTE on DeepLoc: the original eukaryotic "DeepLoc 2.0" web predictor is NOT
published as a runnable app on BioLib. The closest runnable subcellular
localization predictor from the same DeepLoc family is ``KU/DeepLocPro``
(prokaryotic). ``deeploc`` is mapped to it. See module docstring / tool
description so callers are not misled.

How it runs
-----------
``biolib.load('DTU/DeepTMHMM').cli(args='--fasta input.fasta')`` submits a job
to the BioLib cloud, waits for completion, and exposes output files. Jobs run
ANONYMOUSLY (no account required) but are queued on shared compute, so a single
prediction commonly takes 2-5 minutes. This is a genuine remote compute job,
not a local model -- the only local dependency is the ``pybiolib`` client.

Dependency handling: ``biolib`` is imported at module load behind a guarded
try/except. If it is missing, ``BIOLIB_AVAILABLE`` is False and ``run()``
returns a clean error telling the caller to ``pip install pybiolib`` -- it never
raises ImportError to the framework.
"""

import os
import re
import tempfile
from typing import Any, Dict, List, Optional, Tuple

from .base_tool import BaseTool
from .tool_registry import register_tool

# ---------------------------------------------------------------------------
# Guarded optional dependency (framework optional-dep design)
# ---------------------------------------------------------------------------
try:
    import biolib  # noqa: F401

    BIOLIB_AVAILABLE = True
except ImportError:  # pragma: no cover - exercised only when dep absent
    biolib = None
    BIOLIB_AVAILABLE = False


# Map the user-facing model choice -> (BioLib app URI, CLI fasta flag).
_MODEL_MAP = {
    "deeptmhmm": ("DTU/DeepTMHMM", "--fasta"),
    "signalp": ("DTU/SignalP_6", "--fastafile"),
    "deeploc": ("KU/DeepLocPro", "--fasta"),
}

# Hard ceiling on how long we will block waiting for a BioLib job, in seconds.
# BioLib jobs queue on shared compute and routinely take 2-5 minutes.
_DEFAULT_MAX_WAIT = 600
_MAX_ALLOWED_WAIT = 1800

# Refuse absurdly large inputs so we never submit something that will hang.
_MAX_SEQ_LEN = 5000
_MAX_RECORDS = 50


[docs] @register_tool("DTUProteinTool") class DTUProteinTool(BaseTool): """Run DTU Health Tech protein predictors (DeepTMHMM / SignalP / DeepLoc). Takes a protein FASTA (inline sequence/FASTA text or a file path) plus a model choice, submits the job to the BioLib cloud via ``pybiolib``, waits (bounded) for the result, and returns a parsed prediction. """
[docs] def __init__(self, tool_config: Dict[str, Any]): super().__init__(tool_config) self.parameter = tool_config.get("parameter", {})
# ------------------------------------------------------------------ run
[docs] def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]: if not BIOLIB_AVAILABLE: return { "status": "error", "error": ( "The 'pybiolib' package is required to run DTU protein " "predictors. Install it with: pip install pybiolib" ), } model = str(arguments.get("model", "deeptmhmm")).strip().lower() if model not in _MODEL_MAP: return { "status": "error", "error": ( f"Unknown model '{model}'. Choose one of: " f"{', '.join(sorted(_MODEL_MAP))}." ), } # Resolve the FASTA input into a text blob. fasta_text, err = self._resolve_fasta(arguments) if err: return {"status": "error", "error": err} records = self._parse_fasta(fasta_text) if not records: return { "status": "error", "error": ( "No protein records found. Provide a FASTA sequence via " "'sequence' / 'fasta' (inline text or amino-acid string) " "or 'fasta_path' (path to a .fasta file)." ), } if len(records) > _MAX_RECORDS: return { "status": "error", "error": ( f"Too many sequences ({len(records)}); max {_MAX_RECORDS} " "per call to keep cloud runtime bounded." ), } for name, seq in records: if len(seq) > _MAX_SEQ_LEN: return { "status": "error", "error": ( f"Sequence '{name}' has length {len(seq)} > " f"{_MAX_SEQ_LEN}; split or shorten it." ), } max_wait = self._clamp_wait(arguments.get("max_wait_time")) return self._run_biolib(model, records, max_wait)
# ----------------------------------------------------------- input prep
[docs] def _resolve_fasta( self, arguments: Dict[str, Any] ) -> Tuple[Optional[str], Optional[str]]: """Return (fasta_text, error). Accepts a path, FASTA text, or a bare amino-acid sequence.""" path = arguments.get("fasta_path") if path: if not os.path.isfile(path): return None, f"fasta_path does not exist: {path}" try: with open(path, "r", encoding="utf-8") as fh: return fh.read(), None except OSError as exc: return None, f"Could not read fasta_path: {exc}" text = arguments.get("sequence") or arguments.get("fasta") if not text or not str(text).strip(): return None, ( "Provide a protein via 'sequence'/'fasta' (FASTA text or a bare " "amino-acid string) or 'fasta_path' (file path)." ) text = str(text).strip() # Bare amino-acid string (no header) -> wrap it. if not text.startswith(">"): cleaned = re.sub(r"\s+", "", text).upper() text = f">query\n{cleaned}" return text, None
[docs] @staticmethod def _parse_fasta(text: str) -> List[Tuple[str, str]]: """Parse FASTA text into [(name, sequence), ...].""" records: List[Tuple[str, str]] = [] name: Optional[str] = None chunks: List[str] = [] for line in text.splitlines(): line = line.strip() if not line: continue if line.startswith(">"): if name is not None: records.append((name, "".join(chunks))) name = line[1:].strip().split()[0] if line[1:].strip() else "query" chunks = [] else: chunks.append(re.sub(r"\s+", "", line).upper()) if name is not None: records.append((name, "".join(chunks))) # Drop records with empty sequence. return [(n, s) for n, s in records if s]
[docs] @staticmethod def _clamp_wait(value: Any) -> int: try: wait = int(value) except (TypeError, ValueError): return _DEFAULT_MAX_WAIT return max(30, min(wait, _MAX_ALLOWED_WAIT))
# --------------------------------------------------------- biolib runner
[docs] def _run_biolib( self, model: str, records: List[Tuple[str, str]], max_wait: int ) -> Dict[str, Any]: app_uri, fasta_flag = _MODEL_MAP[model] # Write the FASTA into a temp dir. BioLib uploads input files by their # path RELATIVE to the current working directory, so we run cli() from # inside the temp dir and reference the file by bare name (an absolute # path resolves to a non-existent path inside the remote sandbox -> # "FASTA file not found"). tmp_dir = tempfile.mkdtemp(prefix="dtu_protein_") fasta_name = "input.fasta" fasta_path = os.path.join(tmp_dir, fasta_name) try: with open(fasta_path, "w", encoding="utf-8") as fh: for name, seq in records: fh.write(f">{name}\n{seq}\n") except OSError as exc: return {"status": "error", "error": f"Could not stage FASTA: {exc}"} try: app = biolib.load(app_uri) except Exception as exc: # network / not-found / auth return { "status": "error", "error": f"Could not load BioLib app '{app_uri}': {exc}", } cli_args = self._build_cli_args(model, fasta_flag, fasta_name) prev_cwd = os.getcwd() try: os.chdir(tmp_dir) try: job = app.cli(args=cli_args, blocking=True) except TypeError: # Older/newer signature without 'blocking' kwarg. job = app.cli(args=cli_args) except Exception as exc: return self._job_error(app_uri, exc) finally: os.chdir(prev_cwd) # Wait (bounded) for completion if the API is non-blocking. wait_err = self._wait_for_job(job, max_wait) if wait_err: return {"status": "error", "error": wait_err} outputs = self._collect_outputs(job) if not outputs: stdout = self._safe_stdout(job) return { "status": "error", "error": ( f"BioLib app '{app_uri}' produced no output files. " f"Job stdout (truncated): {stdout[:500]}" ), } parsed = self._parse_outputs(model, outputs, records) return { "status": "success", "data": { "model": model, "app": app_uri, "num_sequences": len(records), "predictions": parsed, }, }
[docs] @staticmethod def _build_cli_args(model: str, fasta_flag: str, fasta_name: str) -> str: # fasta_name is a bare filename; cli() runs from the temp dir (cwd). args = f"{fasta_flag} {fasta_name}" if model == "signalp": # SignalP-6 needs organism, a text output format, and an output dir # (without --output_dir the BioLib app writes no files). args += " --organism other --format txt --mode fast --output_dir output" elif model == "deeploc": # DeepLocPro takes an output dir flag. NOTE: the upstream # KU/DeepLocPro app currently crashes during ESM embedding with a # device-mismatch RuntimeError; this tool surfaces that as a clean # error until the app is fixed. args += " --output output" return args
[docs] @staticmethod def _job_error(app_uri: str, exc: Exception) -> Dict[str, Any]: return { "status": "error", "error": f"BioLib job for '{app_uri}' failed to start: {exc}", }
[docs] def _wait_for_job(self, job: Any, max_wait: int) -> Optional[str]: """Block (bounded) until the job finishes. Returns an error string on timeout/failure, else None. Tolerant of API differences across pybiolib versions.""" waiter = getattr(job, "wait", None) if callable(waiter): try: try: waiter(timeout=max_wait) except TypeError: waiter() except Exception as exc: name = type(exc).__name__ if "Timeout" in name or "timeout" in str(exc).lower(): return ( f"BioLib job did not finish within {max_wait}s " "(jobs queue on shared compute; raise max_wait_time)." ) return f"BioLib job wait failed: {exc}" # Verify the job actually succeeded when an exit code is exposed. getter = getattr(job, "get_exit_code", None) if callable(getter): try: code = getter() if code not in (None, 0): stdout = self._safe_stdout(job) return ( f"BioLib job exited with code {code}. " f"stdout (truncated): {stdout[:500]}" ) except Exception: pass return None
[docs] @staticmethod def _safe_stdout(job: Any) -> str: getter = getattr(job, "get_stdout", None) if not callable(getter): return "" try: out = getter() return ( out.decode("utf-8", "replace") if isinstance(out, bytes) else str(out) ) except Exception: return ""
[docs] @staticmethod def _collect_outputs(job: Any) -> Dict[str, str]: """Return {filename: text_content} for all readable output files.""" outputs: Dict[str, str] = {} lister = getattr(job, "list_output_files", None) if not callable(lister): return outputs try: files = lister() except Exception: return outputs for f in files or []: name = getattr(f, "path", None) or str(f) try: handle = job.get_output_file(name) data = handle.get_data() if isinstance(data, bytes): # Skip binary (e.g. plot.png); keep text outputs only. if name.lower().endswith((".png", ".jpg", ".pdf", ".gz")): continue data = data.decode("utf-8", "replace") outputs[name] = data except Exception: continue return outputs
# ----------------------------------------------------------- parsers
[docs] def _parse_outputs( self, model: str, outputs: Dict[str, str], records: List[Tuple[str, str]], ) -> List[Dict[str, Any]]: if model == "deeptmhmm": return self._parse_deeptmhmm(outputs, records) if model == "signalp": return self._parse_signalp(outputs, records) return self._parse_deeploc(outputs, records)
[docs] @staticmethod def _find_output(outputs: Dict[str, str], *suffixes: str) -> Optional[str]: for name, content in outputs.items(): low = name.lower() if any(low.endswith(sfx) for sfx in suffixes): return content return None
[docs] def _parse_deeptmhmm( self, outputs: Dict[str, str], records: List[Tuple[str, str]] ) -> List[Dict[str, Any]]: """Parse DeepTMHMM .3line topology + .gff3 region table.""" results: List[Dict[str, Any]] = [] three_line = self._find_output(outputs, ".3line") or "" gff3 = self._find_output(outputs, ".gff3", ".gff") or "" # .3line: blocks of (>name | TYPE) / sequence / topology-string. topo_by_name: Dict[str, Dict[str, str]] = {} block: List[str] = [] for line in three_line.splitlines(): if line.startswith(">"): self._flush_3line(block, topo_by_name) block = [line] elif line.strip(): block.append(line) self._flush_3line(block, topo_by_name) # .gff3: per-protein region rows -> [{kind, start, end}] regions_by_name = self._parse_gff_regions(gff3) for name, seq in records: info = topo_by_name.get(name, {}) results.append( { "id": name, "sequence_length": len(seq), "classification": info.get("type"), "topology": info.get("topology"), "regions": regions_by_name.get(name, []), } ) return results
[docs] @staticmethod def _flush_3line(block: List[str], out: Dict[str, Dict[str, str]]) -> None: if len(block) < 3 or not block[0].startswith(">"): return header = block[0][1:].strip() # ">name | TYPE" parts = [p.strip() for p in header.split("|")] name = parts[0].split()[0] if parts[0] else "query" ptype = parts[1] if len(parts) > 1 else None out[name] = {"type": ptype, "topology": block[2].strip()}
[docs] @staticmethod def _parse_gff_regions(gff: str) -> Dict[str, List[Dict[str, Any]]]: """Parse a region table into {name: [{kind, start, end}]}. Handles two layouts seen across DTU tools: * Standard GFF3 (SignalP): seqid, source, type, start, end, ... * DeepTMHMM TMRs.gff3: seqid, kind, start, end Layout is detected per-row by where the two integer columns sit. """ regions: Dict[str, List[Dict[str, Any]]] = {} for line in gff.splitlines(): line = line.rstrip() if not line or line.startswith("#"): continue cols = [c.strip() for c in line.split("\t")] if len(cols) < 4: continue name = cols[0] parsed = None # Standard GFF3: kind=col2, start=col3, end=col4. if len(cols) >= 5 and cols[3].isdigit() and cols[4].isdigit(): parsed = (cols[2], int(cols[3]), int(cols[4])) # DeepTMHMM custom: kind=col1, start=col2, end=col3. elif cols[2].isdigit() and cols[3].isdigit(): parsed = (cols[1], int(cols[2]), int(cols[3])) if parsed: kind, start, end = parsed regions.setdefault(name, []).append( {"kind": kind, "start": start, "end": end} ) return regions
[docs] def _parse_signalp( self, outputs: Dict[str, str], records: List[Tuple[str, str]] ) -> List[Dict[str, Any]]: """Parse SignalP-6 prediction_results.txt / .gff3.""" results: List[Dict[str, Any]] = [] txt = self._find_output(outputs, "prediction_results.txt", ".txt") or "" gff = self._find_output(outputs, ".gff3", ".gff") or "" # prediction_results.txt rows: # ID Prediction OTHER SP(..) ... CS Position # The last column holds e.g. "CS pos: 22-23. Pr: 0.8406" for SP hits. calls: Dict[str, Dict[str, Any]] = {} for line in txt.splitlines(): if not line.strip() or line.startswith("#"): continue cols = line.split("\t") if len(cols) < 2: continue cs = None m = re.search(r"CS pos:\s*(\d+)", cols[-1]) if m: cs = int(m.group(1)) calls[cols[0].strip()] = { "prediction": cols[1].strip(), "cleavage_site": cs, } # Fallback: the signal_peptide region end from the GFF3 output. cleavage = self._parse_gff_regions(gff) for name, seq in records: call = calls.get(name, {}) cs = call.get("cleavage_site") if cs is None: for r in cleavage.get(name, []): if "signal" in r.get("kind", "").lower(): cs = r.get("end") results.append( { "id": name, "sequence_length": len(seq), "prediction": call.get("prediction"), "has_signal_peptide": ( bool(call.get("prediction")) and call["prediction"].upper() != "OTHER" ) if call.get("prediction") else None, "cleavage_site": cs, } ) return results
[docs] def _parse_deeploc( self, outputs: Dict[str, str], records: List[Tuple[str, str]] ) -> List[Dict[str, Any]]: """Parse DeepLoc(Pro) CSV/TSV results (ID, Localization, ...).""" results: List[Dict[str, Any]] = [] table = self._find_output(outputs, ".csv", ".tsv", ".txt") or "" rows: Dict[str, Dict[str, Any]] = {} header: List[str] = [] delim = "\t" if "\t" in table.splitlines()[0] else "," for i, line in enumerate(table.splitlines()): if not line.strip(): continue cols = [c.strip() for c in line.split(delim)] if i == 0: header = [c.lower() for c in cols] continue rec = dict(zip(header, cols)) rid = ( rec.get("protein_id") or rec.get("id") or rec.get("name") or (cols[0] if cols else None) ) if rid: rows[rid] = rec for name, seq in records: rec = rows.get(name, {}) loc = ( rec.get("localization") or rec.get("prediction") or rec.get("location") ) results.append( { "id": name, "sequence_length": len(seq), "localization": loc, "details": rec or None, } ) return results