Source code for tooluniverse.encori_tool

"""ENCORI / starBase RNA-interactome tools for ToolUniverse.

ENCORI (the Encyclopedia of RNA Interactomes, formerly starBase) aggregates
CLIP-seq-supported and computationally predicted RNA interactions and exposes
them through a public REST API (no authentication). The single registered
class ``ENCORITool`` serves several distinct ENCORI REST modules:

* ``miRNATarget/``   -> miRNA-target interactions (the original tool)
* ``RBPTarget/``     -> RBP-RNA binding sites from CLIP-seq
* ``ceRNA/``         -> competing-endogenous-RNA (miRNA-sponge) networks
* ``RNARNA/``        -> RNA-RNA duplex interactions (PARIS/LIGR/SPLASH)
* ``degradomeRNA/``  -> miRNA cleavage sites validated by degradome-seq
* ``RBPDisease/``    -> RBP binding correlated with somatic (COSMIC) mutations
* ``RBPMotifScan/``  -> RBP binding-motif enrichment scan

Which module a tool calls is selected purely from its JSON config via
``fields.encori_endpoint``; tools that omit it fall back to the historical
miRNA-target behaviour. This lets all ENCORI tools reuse the one registered
class with no extra registration.

API: https://rnasysu.com/encori/api/  (public, no authentication)
"""

from typing import Any, Dict, List

import requests

from .base_tool import BaseTool
from .tool_registry import register_tool

ENCORI_BASE = "https://rnasysu.com/encori/api/"
ENCORI_URL = ENCORI_BASE + "miRNATarget/"
# Columns flagged 1 when that prediction program supports the interaction.
_PROGRAMS = ["PITA", "RNA22", "miRmap", "microT", "miRanda", "PicTar", "TargetScan"]


def _to_int(value: Any, default: int) -> int:
    try:
        return int(value)
    except (TypeError, ValueError):
        return default


def _to_num(value: Any) -> Any:
    """Best-effort numeric coercion: int -> float -> original string."""
    if value is None or value == "" or value == "NA":
        return None
    try:
        return int(value)
    except (TypeError, ValueError):
        pass
    try:
        return float(value)
    except (TypeError, ValueError):
        return value


[docs] @register_tool("ENCORITool") class ENCORITool(BaseTool): """ENCORI / starBase RNA-interactome lookups (config-selected module)."""
[docs] def __init__(self, tool_config: Dict[str, Any]): super().__init__(tool_config) self.timeout = tool_config.get("timeout", 30) fields = tool_config.get("fields") or {} # Which ENCORI REST module this tool instance targets. When unset, the # tool keeps its original miRNA-target behaviour. self.encori_endpoint = fields.get("encori_endpoint")
# ------------------------------------------------------------------ # Shared HTTP + TSV parsing # ------------------------------------------------------------------
[docs] def _fetch_tsv(self, endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]: """GET an ENCORI module and return parsed {header, rows} or an error dict. ENCORI returns tab-separated text: comment lines start with '#', then a header line, then data rows. On a bad parameter the body is a single plain-language line (e.g. 'The "RNA" parameter haven't been set correctly!') instead of tabular data, so detect that explicitly. """ url = ENCORI_BASE + endpoint try: resp = requests.get(url, params=params, timeout=self.timeout) except requests.exceptions.Timeout: return {"error": f"ENCORI API timed out after {self.timeout}s"} except requests.exceptions.RequestException as e: return {"error": f"ENCORI API request failed: {e}"} if resp.status_code != 200: return {"error": f"ENCORI API returned HTTP {resp.status_code}"} lines = [ln for ln in resp.text.splitlines() if ln and not ln.startswith("#")] if not lines: return {"error": "ENCORI API returned an empty response."} header = lines[0].split("\t") # A single non-tabular line is ENCORI's way of reporting a bad request. if len(header) < 2: return { "error": "ENCORI rejected the query: " + lines[0].strip(), } return {"header": header, "lines": lines}
[docs] @staticmethod def _row_dicts(parsed: Dict[str, Any]) -> List[Dict[str, str]]: header = parsed["header"] out: List[Dict[str, str]] = [] for ln in parsed["lines"][1:]: f = ln.split("\t") if len(f) < len(header): continue out.append(dict(zip(header, f))) return out
# ------------------------------------------------------------------ # Dispatch # ------------------------------------------------------------------
[docs] def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]: dispatch = { "RBPTarget/": self._run_rbp_target, "ceRNA/": self._run_cerna, "RNARNA/": self._run_rna_rna, "degradomeRNA/": self._run_degradome, "RBPDisease/": self._run_rbp_disease, "RBPMotifScan/": self._run_motif_scan, } handler = dispatch.get(self.encori_endpoint) if handler is not None: return handler(arguments) return self._run_mirna_target(arguments)
# ------------------------------------------------------------------ # 1. miRNA -> target / gene -> miRNA (original behaviour) # ------------------------------------------------------------------
[docs] def _run_mirna_target(self, arguments: Dict[str, Any]) -> Dict[str, Any]: mirna = (arguments.get("mirna") or "").strip() gene = (arguments.get("gene") or arguments.get("gene_symbol") or "").strip() if not mirna and not gene: return { "status": "error", "error": "Provide 'mirna' (e.g. 'hsa-miR-21-5p') to get its targets, " "or 'gene' (e.g. 'TP53') to get the miRNAs that target it.", } clip_min = _to_int(arguments.get("clip_min", 1), 1) program_min = _to_int(arguments.get("program_min", 1), 1) limit = max(1, min(_to_int(arguments.get("limit", 50), 50), 500)) params = { "assembly": arguments.get("assembly", "hg38"), "geneType": "mRNA", "miRNA": mirna or "all", "target": gene or "all", "clipExpNum": clip_min, "degraExpNum": 0, "pancancerNum": 0, "programNum": program_min, "program": "None", "cellType": "all", } parsed = self._fetch_tsv("miRNATarget/", params) if "error" in parsed: return {"status": "error", "error": parsed["error"]} idx = {h: i for i, h in enumerate(parsed["header"])} rows = [] for ln in parsed["lines"][1:]: f = ln.split("\t") if len(f) < len(parsed["header"]): continue programs = [p for p in _PROGRAMS if p in idx and f[idx[p]] == "1"] rows.append( { "mirna": f[idx["miRNAname"]], "gene": f[idx["geneName"]], "gene_id": f[idx["geneID"]], "clip_experiments": _to_int(f[idx["clipExpNum"]], 0), "predicted_by": programs, "n_programs": len(programs), "pan_cancer_num": _to_int(f[idx["pancancerNum"]], 0) if "pancancerNum" in idx else None, } ) rows.sort(key=lambda r: (r["clip_experiments"], r["n_programs"]), reverse=True) return { "status": "success", "data": rows[:limit], "metadata": { "source": "ENCORI (starBase)", "query": mirna or gene, "direction": "miRNA->targets" if mirna else "gene->miRNAs", "total": len(rows), "returned": min(len(rows), limit), "interpretation": ( "clip_experiments = number of CLIP-seq experiments supporting the " "site (experimental evidence; higher = stronger). predicted_by lists " "the algorithms predicting it. CLIP-supported targets outrank " "prediction-only ones." ), }, }
# ------------------------------------------------------------------ # 2. RBP -> RNA targets / gene -> RBPs (RBPTarget/) # ------------------------------------------------------------------
[docs] def _run_rbp_target(self, arguments: Dict[str, Any]) -> Dict[str, Any]: rbp = (arguments.get("rbp") or arguments.get("RBP") or "").strip() gene = (arguments.get("gene") or arguments.get("gene_symbol") or "").strip() if not rbp and not gene: return { "status": "error", "error": "Provide 'rbp' (e.g. 'PTBP1') to get the RNAs it binds, " "or 'gene' (e.g. 'TP53') to get the RBPs that bind it.", } clip_min = _to_int(arguments.get("clip_min", 1), 1) limit = max(1, min(_to_int(arguments.get("limit", 100), 100), 500)) params = { "assembly": arguments.get("assembly", "hg38"), "geneType": arguments.get("gene_type") or "mRNA", "RBP": rbp or "all", "target": gene or "all", "clipExpNum": clip_min, "pancancerNum": _to_int(arguments.get("pancancer_min", 0), 0), "cellType": arguments.get("cell_type") or "all", } parsed = self._fetch_tsv("RBPTarget/", params) if "error" in parsed: return {"status": "error", "error": parsed["error"]} rows = [] for d in self._row_dicts(parsed): rows.append( { "rbp": d.get("RBP"), "gene": d.get("geneName"), "gene_id": d.get("geneID"), "gene_type": d.get("geneType"), "cluster_num": _to_int(d.get("clusterNum"), 0), "total_clip_experiments": _to_int(d.get("totalClipExpNum"), 0), "total_clip_sites": _to_int(d.get("totalClipSiteNum"), 0), "clip_experiments": _to_int(d.get("clipExpNum"), 0), "chromosome": d.get("chromosome"), "strand": d.get("strand"), "narrow_start": _to_num(d.get("narrowStart")), "narrow_end": _to_num(d.get("narrowEnd")), "pancancer_num": _to_int(d.get("pancancerNum"), 0), "cell_tissue": d.get("cellline/tissue"), } ) rows.sort( key=lambda r: (r["total_clip_experiments"], r["total_clip_sites"]), reverse=True, ) return { "status": "success", "data": rows[:limit], "metadata": { "source": "ENCORI (starBase) RBPTarget", "query": rbp or gene, "direction": "RBP->targets" if rbp else "gene->RBPs", "total": len(rows), "returned": min(len(rows), limit), "interpretation": ( "Each row is a CLIP-seq-supported binding cluster. " "total_clip_experiments / total_clip_sites quantify how many " "experiments and sites support the RBP-RNA binding." ), }, }
# ------------------------------------------------------------------ # 3. ceRNA / miRNA-sponge network (ceRNA/) # ------------------------------------------------------------------
[docs] def _run_cerna(self, arguments: Dict[str, Any]) -> Dict[str, Any]: gene = ( arguments.get("gene") or arguments.get("ceRNA") or arguments.get("gene_symbol") or "" ).strip() if not gene: return { "status": "error", "error": "Provide 'gene' (e.g. 'PTEN') to get its ceRNA / miRNA-sponge partners.", } limit = max(1, min(_to_int(arguments.get("limit", 100), 100), 500)) params = { "assembly": arguments.get("assembly", "hg38"), "geneType": arguments.get("gene_type") or "mRNA", "ceRNA": gene, "miRNAnum": _to_int(arguments.get("shared_mirna_min", 5), 5), "pval": arguments.get("pval", 0.01), "fdr": arguments.get("fdr", 0.01), } parsed = self._fetch_tsv("ceRNA/", params) if "error" in parsed: return {"status": "error", "error": parsed["error"]} rows = [] for d in self._row_dicts(parsed): rows.append( { "gene": d.get("geneName"), "gene_id": d.get("geneID"), "partner": d.get("ceRNAname"), "partner_id": d.get("ceRNAid"), "partner_gene_type": d.get("ceRNAgeneType"), "shared_mirna_families": _to_int(d.get("hitMiRNAFamilyNum"), 0), "pval": _to_num(d.get("pval")), "fdr": _to_num(d.get("fdr")), } ) rows.sort(key=lambda r: r["shared_mirna_families"], reverse=True) return { "status": "success", "data": rows[:limit], "metadata": { "source": "ENCORI (starBase) ceRNA", "query": gene, "total": len(rows), "returned": min(len(rows), limit), "interpretation": ( "Each partner shares miRNA-binding families with the query gene, " "making it a candidate competing-endogenous RNA (miRNA sponge). " "shared_mirna_families = number of shared miRNA families; lower " "pval/fdr = stronger ceRNA evidence." ), }, }
# ------------------------------------------------------------------ # 4. RNA-RNA duplex interactions (RNARNA/) # ------------------------------------------------------------------
[docs] def _run_rna_rna(self, arguments: Dict[str, Any]) -> Dict[str, Any]: rna = ( arguments.get("rna") or arguments.get("RNA") or arguments.get("gene") or "" ).strip() if not rna: return { "status": "error", "error": "Provide 'rna' (e.g. 'MALAT1') to get its RNA-RNA duplex partners.", } limit = max(1, min(_to_int(arguments.get("limit", 100), 100), 500)) params = { "assembly": arguments.get("assembly", "hg38"), # MALAT1 etc. are lncRNAs, so default geneType to lncRNA; ENCORI # silently returns an error body if geneType != the RNA's biotype. "geneType": arguments.get("gene_type") or "lncRNA", "RNA": rna, "interNum": _to_int(arguments.get("interaction_min", 1), 1), "expNum": _to_int(arguments.get("exp_min", 1), 1), "cellType": arguments.get("cell_type") or "all", } parsed = self._fetch_tsv("RNARNA/", params) if "error" in parsed: return {"status": "error", "error": parsed["error"]} rows = [] for d in self._row_dicts(parsed): rows.append( { "rna": d.get("geneName"), "rna_id": d.get("geneID"), "rna_type": d.get("geneType"), "partner": d.get("pairGeneName"), "partner_id": d.get("pairGeneID"), "partner_type": d.get("pairGeneType"), "interaction_num": _to_int(d.get("interactionNum"), 0), "total_experiments": _to_int(d.get("totalExpNum"), 0), "total_reads": _to_int(d.get("totalReadsNum"), 0), "free_energy": _to_num(d.get("FreeEnergy")), "align_score": _to_num(d.get("AlignScore(Smith-Waterman)")), "cell_tissue": d.get("CellLine/Tissue"), } ) rows.sort( key=lambda r: (r["total_experiments"], r["total_reads"]), reverse=True ) return { "status": "success", "data": rows[:limit], "metadata": { "source": "ENCORI (starBase) RNARNA", "query": rna, "total": len(rows), "returned": min(len(rows), limit), "interpretation": ( "Each partner forms a base-pairing duplex with the query RNA, " "detected by crosslinking (PARIS/LIGR/SPLASH). total_experiments / " "total_reads quantify support; free_energy is the predicted " "hybridisation energy (more negative = more stable)." ), }, }
# ------------------------------------------------------------------ # 5. Degradome-seq miRNA cleavage (degradomeRNA/) # ------------------------------------------------------------------
[docs] def _run_degradome(self, arguments: Dict[str, Any]) -> Dict[str, Any]: mirna = (arguments.get("mirna") or "").strip() gene = (arguments.get("gene") or arguments.get("gene_symbol") or "").strip() if not mirna and not gene: return { "status": "error", "error": "Provide 'gene' (e.g. 'TP53') to get degradome-validated miRNA " "cleavage of it, or 'mirna' for a specific miRNA's cleavage targets.", } limit = max(1, min(_to_int(arguments.get("limit", 100), 100), 500)) params = { # hg38 degradome data is not built; hg19 is the supported assembly. "assembly": arguments.get("assembly", "hg19"), "geneType": arguments.get("gene_type") or "mRNA", "miRNA": mirna or "all", "target": gene or "all", "degraExpNum": _to_int(arguments.get("degradome_exp_min", 1), 1), "clipExpNum": _to_int(arguments.get("clip_min", 1), 1), "cellType": arguments.get("cell_type") or "all", } parsed = self._fetch_tsv("degradomeRNA/", params) if "error" in parsed: return {"status": "error", "error": parsed["error"]} rows = [] for d in self._row_dicts(parsed): rows.append( { "mirna": d.get("miRNAname"), "mirna_id": d.get("miRNAid"), "gene": d.get("geneName"), "gene_type": d.get("geneType"), "cleave_event_num": _to_int(d.get("cleaveEventNum"), 0), "degradome_experiments": _to_int(d.get("degraExpNum"), 0), "degradome_sites": _to_int(d.get("degraSiteNum"), 0), "total_reads": _to_int(d.get("totalReads"), 0), "category": d.get("category"), } ) rows.sort( key=lambda r: (r["degradome_experiments"], r["cleave_event_num"]), reverse=True, ) return { "status": "success", "data": rows[:limit], "metadata": { "source": "ENCORI (starBase) degradomeRNA", "query": mirna or gene, "assembly": params["assembly"], "total": len(rows), "returned": min(len(rows), limit), "interpretation": ( "Each row is a degradome-seq (PARE)-validated slicer cleavage of " "the target by the miRNA. cleave_event_num / degradome_experiments " "quantify support; category (I-IV) ranks cleavage-signal confidence " "(I strongest). hg19 is the only assembly with degradome data." ), }, }
# ------------------------------------------------------------------ # 6. RBP-disease (COSMIC) associations (RBPDisease/) # ------------------------------------------------------------------
[docs] def _run_rbp_disease(self, arguments: Dict[str, Any]) -> Dict[str, Any]: rbp = (arguments.get("rbp") or arguments.get("RBP") or "").strip() gene = (arguments.get("gene") or arguments.get("gene_symbol") or "").strip() tissue = (arguments.get("tissue") or "").strip() disease = (arguments.get("disease") or "").strip() if not rbp and not gene and not tissue and not disease: return { "status": "error", "error": "Provide at least one of 'gene' (target, e.g. 'MYC'), 'rbp', " "'tissue' (e.g. 'breast'), or 'disease' (e.g. 'carcinoma').", } limit = max(1, min(_to_int(arguments.get("limit", 100), 100), 500)) params = { "assembly": arguments.get("assembly", "hg38"), "RBP": rbp or "all", "target": gene or "all", "tissue": tissue or "all", "disease": disease or "all", } parsed = self._fetch_tsv("RBPDisease/", params) if "error" in parsed: return {"status": "error", "error": parsed["error"]} rows = [] for d in self._row_dicts(parsed): rows.append( { "rbp": d.get("RBP"), "gene": d.get("geneName"), "gene_id": d.get("geneID"), "tissue": d.get("tissue"), "disease_num": _to_int(d.get("diseaseNum"), 0), "diseases": d.get("diseases"), "disease_cosmic_id": d.get("diseaseCosmicID"), "cosmic_num": _to_int(d.get("cosmicNum"), 0), "sample_num": _to_int(d.get("sampleNum"), 0), "mut_type_num": _to_int(d.get("mutTypeNum"), 0), "clip_experiments": _to_int(d.get("clipExpNum"), 0), "clip_sites": _to_int(d.get("clipSiteNum"), 0), } ) rows.sort(key=lambda r: (r["cosmic_num"], r["sample_num"]), reverse=True) return { "status": "success", "data": rows[:limit], "metadata": { "source": "ENCORI (starBase) RBPDisease", "query": gene or rbp or disease or tissue, "total": len(rows), "returned": min(len(rows), limit), "interpretation": ( "Each row links an RBP binding site on a gene to somatic (COSMIC) " "mutations in a tissue/disease. cosmic_num / sample_num quantify the " "mutational burden overlapping the binding site." ), }, }
# ------------------------------------------------------------------ # 7. RBP binding-motif enrichment scan (RBPMotifScan/) # ------------------------------------------------------------------
[docs] def _run_motif_scan(self, arguments: Dict[str, Any]) -> Dict[str, Any]: motif = (arguments.get("motif") or "").strip() rbp = (arguments.get("rbp") or arguments.get("RBP") or "").strip() if not motif and not rbp: return { "status": "error", "error": "Provide 'motif' (e.g. 'UGCAUG') to find RBPs/datasets whose " "CLIP peaks are enriched for it, or 'rbp' to list that RBP's motifs.", } rank_limit = max(1, min(_to_int(arguments.get("rank_limit", 10), 10), 100)) limit = max(1, min(_to_int(arguments.get("limit", 100), 100), 500)) params: Dict[str, Any] = { "assembly": arguments.get("assembly", "hg38"), "length": arguments.get("length") or "short", "rankLimit": rank_limit, } if motif: params["motif"] = motif if rbp: params["RBP"] = rbp parsed = self._fetch_tsv("RBPMotifScan/", params) if "error" in parsed: return {"status": "error", "error": parsed["error"]} rows = [] for d in self._row_dicts(parsed): rows.append( { "rbp": d.get("RBP"), "dataset_id": d.get("DatasetID"), "motif_rank": _to_int(d.get("MotifRank"), 0), "identified_motif": d.get("IdentifiedMotif"), "query_motif": d.get("QueryMotif"), "target_peak_num": _to_int(d.get("targetPeakNum"), 0), "target_percentage": _to_num(d.get("TargetPercentage(%)")), "pvalue": _to_num(d.get("p-value")), "pvalue_ln": _to_num(d.get("p-value(ln)")), "motif_matrix": d.get("MotifMatrix"), "region": d.get("Region"), "cell_tissue": d.get("CellLine/Tissue"), "main_accession": d.get("MainAccession"), } ) rows.sort(key=lambda r: r["target_peak_num"], reverse=True) return { "status": "success", "data": rows[:limit], "metadata": { "source": "ENCORI (starBase) RBPMotifScan", "query": motif or rbp, "total": len(rows), "returned": min(len(rows), limit), "interpretation": ( "Each row is a sequence motif enriched in an RBP's CLIP peaks. " "target_peak_num = number of peaks containing the motif; lower " "pvalue = stronger enrichment. motif_matrix links to the HOMER " "position-weight matrix." ), }, }