Source code for tooluniverse.pathoplexus_tool

"""
Pathoplexus (LAPIS) tools for ToolUniverse — open pathogen genomic surveillance.

Pathoplexus is an open, community pathogen-sequence database (launched 2024) served
by LAPIS (Lineage API for Sequences). These tools query aggregated sequence counts
and characteristic mutations per organism — genomic-epidemiology surveillance for
emerging pathogens. Distinct from Nextstrain (phylogenetic builds) and BV-BRC.

API: https://lapis.pathoplexus.org/{organism}/sample/...  (public, no authentication)
Known organisms: west-nile, ebola-zaire, ebola-sudan, cchf, mpox (more may be added).
"""

from typing import Any, Dict

import requests

from .base_tool import BaseTool
from .tool_registry import register_tool

LAPIS_BASE = "https://lapis.pathoplexus.org"


def _organism(arguments: Dict[str, Any]) -> str:
    return (arguments.get("organism") or "").strip().lower()


class _PathoplexusBase(BaseTool):
    def __init__(self, tool_config: Dict[str, Any]):
        super().__init__(tool_config)
        self.timeout = tool_config.get("fields", {}).get("timeout", 30)

    def _get(
        self, organism: str, endpoint: str, params: Dict[str, Any]
    ) -> Dict[str, Any]:
        resp = requests.get(
            f"{LAPIS_BASE}/{organism}/sample/{endpoint}",
            params=params,
            headers={"Accept": "application/json"},
            timeout=self.timeout,
        )
        resp.raise_for_status()
        return resp.json()

    def _get_text(
        self, organism: str, endpoint: str, params: Dict[str, Any], accept: str
    ) -> str:
        resp = requests.get(
            f"{LAPIS_BASE}/{organism}/sample/{endpoint}",
            params=params,
            headers={"Accept": accept},
            timeout=self.timeout,
        )
        resp.raise_for_status()
        return resp.text

    @staticmethod
    def _common_filters(arguments: Dict[str, Any]) -> Dict[str, Any]:
        params: Dict[str, Any] = {}
        country = (arguments.get("country") or "").strip()
        if country:
            params["geoLocCountry"] = country
        lineage = (arguments.get("lineage") or "").strip()
        if lineage:
            params["lineage"] = lineage
        return params


[docs] @register_tool("PathoplexusCountTool") class PathoplexusCountTool(_PathoplexusBase): """Pathoplexus organism queries. The ``fields.mode`` config selects the LAPIS endpoint: - ``"aggregated"`` (default): aggregated sequence counts (original behavior) - ``"details"``: per-sequence metadata table (accession, country, date, ...) - ``"fasta"``: download unaligned/aligned nucleotide or amino-acid FASTA All modes are served by this one registered class so no new registration is needed; the original aggregated behavior is unchanged when ``mode`` is absent. """
[docs] def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]: mode = (self.tool_config.get("fields", {}) or {}).get("mode", "aggregated") if mode == "details": return self._run_details(arguments) if mode == "fasta": return self._run_fasta(arguments) return self._run_aggregated(arguments)
[docs] def _run_aggregated(self, arguments: Dict[str, Any]) -> Dict[str, Any]: organism = _organism(arguments) if not organism: return { "status": "error", "error": "'organism' is required (e.g. 'west-nile', 'mpox', 'ebola-zaire', 'cchf')", } filters = self._common_filters(arguments) params = dict(filters) group_by = (arguments.get("group_by") or "").strip() if group_by: params["fields"] = group_by # NOTE: the LAPIS /aggregated endpoint rejects limit/offset (its output is # unordered), so we do not paginate it; grouped output is bounded by the # number of distinct field values. try: payload = self._get(organism, "aggregated", params) except requests.exceptions.Timeout: return { "status": "error", "error": f"Pathoplexus request timed out after {self.timeout}s", } except requests.exceptions.HTTPError as e: return { "status": "error", "error": f"Pathoplexus HTTP {e.response.status_code} — check the organism slug and filter fields", } except requests.exceptions.RequestException as e: return {"status": "error", "error": f"Pathoplexus request failed: {e}"} except ValueError: return { "status": "error", "error": "Pathoplexus returned a non-JSON response", } rows = payload.get("data", []) if isinstance(payload, dict) else [] return { "status": "success", "data": rows, "metadata": { "organism": organism, "grouped_by": group_by or None, "filters": filters or None, "returned": len(rows), "source": "Pathoplexus (LAPIS)", }, }
[docs] def _run_details(self, arguments: Dict[str, Any]) -> Dict[str, Any]: organism = _organism(arguments) if not organism: return { "status": "error", "error": "'organism' is required (e.g. 'west-nile', 'mpox', 'ebola-zaire', 'cchf')", } filters = self._common_filters(arguments) params = dict(filters) fields = (arguments.get("fields") or "").strip() if fields: params["fields"] = [f.strip() for f in fields.split(",") if f.strip()] try: params["limit"] = max(1, min(int(arguments.get("limit") or 50), 1000)) except (TypeError, ValueError): params["limit"] = 50 try: offset = int(arguments.get("offset") or 0) if offset > 0: params["offset"] = offset except (TypeError, ValueError): pass try: payload = self._get(organism, "details", params) except requests.exceptions.Timeout: return { "status": "error", "error": f"Pathoplexus request timed out after {self.timeout}s", } except requests.exceptions.HTTPError as e: return { "status": "error", "error": f"Pathoplexus HTTP {e.response.status_code} — check the organism slug and field names", } except requests.exceptions.RequestException as e: return {"status": "error", "error": f"Pathoplexus request failed: {e}"} except ValueError: return { "status": "error", "error": "Pathoplexus returned a non-JSON response", } rows = payload.get("data", []) if isinstance(payload, dict) else [] return { "status": "success", "data": rows, "metadata": { "organism": organism, "filters": filters or None, "returned": len(rows), "limit": params["limit"], "source": "Pathoplexus (LAPIS)", }, }
[docs] def _run_fasta(self, arguments: Dict[str, Any]) -> Dict[str, Any]: organism = _organism(arguments) if not organism: return { "status": "error", "error": "'organism' is required (e.g. 'west-nile', 'mpox', 'ebola-zaire', 'cchf')", } filters = self._common_filters(arguments) params = dict(filters) params["dataFormat"] = "FASTA" try: params["limit"] = max(1, min(int(arguments.get("limit") or 1), 100)) except (TypeError, ValueError): params["limit"] = 1 seq_type = (arguments.get("sequence_type") or "nucleotide").strip().lower() aligned = bool(arguments.get("aligned")) if seq_type.startswith("amino") or seq_type in ("aa", "protein"): endpoint = ( "alignedAminoAcidSequences" if aligned else "unalignedAminoAcidSequences" ) else: endpoint = ( "alignedNucleotideSequences" if aligned else "unalignedNucleotideSequences" ) try: text = self._get_text(organism, endpoint, params, "text/x-fasta") except requests.exceptions.Timeout: return { "status": "error", "error": f"Pathoplexus request timed out after {self.timeout}s", } except requests.exceptions.HTTPError as e: return { "status": "error", "error": f"Pathoplexus HTTP {e.response.status_code} — check the organism slug and filter fields", } except requests.exceptions.RequestException as e: return {"status": "error", "error": f"Pathoplexus request failed: {e}"} if not text or not text.lstrip().startswith(">"): return { "status": "error", "error": "Pathoplexus returned no FASTA records for this query", } n_records = text.count(">") return { "status": "success", "data": {"fasta": text, "num_records": n_records}, "metadata": { "organism": organism, "endpoint": endpoint, "filters": filters or None, "returned": n_records, "source": "Pathoplexus (LAPIS)", }, }
[docs] @register_tool("PathoplexusMutationsTool") class PathoplexusMutationsTool(_PathoplexusBase): """Characteristic amino-acid or nucleotide mutations for a Pathoplexus organism."""
[docs] def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]: organism = _organism(arguments) if not organism: return { "status": "error", "error": "'organism' is required (e.g. 'west-nile', 'mpox', 'ebola-zaire', 'cchf')", } filters = self._common_filters(arguments) params = dict(filters) try: params["minProportion"] = float(arguments.get("min_proportion") or 0.8) except (TypeError, ValueError): params["minProportion"] = 0.8 try: params["limit"] = max(1, min(int(arguments.get("limit") or 50), 500)) except (TypeError, ValueError): params["limit"] = 50 mtype = (arguments.get("mutation_type") or "aminoAcid").strip() endpoint = ( "nucleotideMutations" if mtype.lower().startswith("nucl") else "aminoAcidMutations" ) try: payload = self._get(organism, endpoint, params) except requests.exceptions.Timeout: return { "status": "error", "error": f"Pathoplexus request timed out after {self.timeout}s", } except requests.exceptions.HTTPError as e: return { "status": "error", "error": f"Pathoplexus HTTP {e.response.status_code} — check the organism slug and filter fields", } except requests.exceptions.RequestException as e: return {"status": "error", "error": f"Pathoplexus request failed: {e}"} except ValueError: return { "status": "error", "error": "Pathoplexus returned a non-JSON response", } rows = payload.get("data", []) if isinstance(payload, dict) else [] muts = [ { "mutation": m.get("mutation"), "gene": m.get("sequenceName"), "position": m.get("position"), "from": m.get("mutationFrom"), "to": m.get("mutationTo"), "proportion": m.get("proportion"), "count": m.get("count"), "coverage": m.get("coverage"), } for m in rows if isinstance(m, dict) ] return { "status": "success", "data": muts, "metadata": { "organism": organism, "mutation_type": endpoint, "min_proportion": params["minProportion"], "filters": filters or None, "returned": len(muts), "source": "Pathoplexus (LAPIS)", }, }