Source code for tooluniverse.deseq2_tool

"""R DESeq2 differential expression analysis tool.

Runs DESeq2 via Rscript for accurate differential expression analysis.
Handles count matrix + metadata → DEG list with proper design formulas,
dispersion estimation, and LFC shrinkage.
"""

import json
import os
import subprocess
import tempfile
from pathlib import Path
from typing import Any, Dict

from .base_tool import BaseTool
from .tool_registry import register_tool


[docs] @register_tool("DESeq2Tool") class DESeq2Tool(BaseTool): """Run R DESeq2 differential expression analysis."""
[docs] def __init__(self, tool_config: Dict[str, Any], **kwargs): super().__init__(tool_config)
[docs] def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]: operation = arguments.get("operation", "deseq2") if operation == "deseq2": return self._run_deseq2(arguments) elif operation == "enrichgo": return self._run_enrichgo(arguments) else: return {"status": "error", "error": f"Unknown operation: {operation}"}
[docs] @staticmethod def _run_rscript(r_script: str, timeout: int = 300) -> subprocess.CompletedProcess: r"""Execute an R script by writing it to a temp file and running it. We deliberately run ``Rscript <file>`` rather than ``Rscript -e <string>``. The ``-e`` form collapses one backslash level before the R parser sees the code, so a regex literal such as the Ensembl-version strip ``sub("\\..*", ...)`` (two backslashes in the generated R source) is mangled down to ``sub("\..*", ...)`` and R aborts with "'\.' is an unrecognized escape in character string". A script file is parsed verbatim and is immune to this. The temp file is always removed, including on timeout. """ tmp = tempfile.NamedTemporaryFile( mode="w", suffix=".R", delete=False, encoding="utf-8" ) try: tmp.write(r_script) tmp.close() return subprocess.run( ["Rscript", tmp.name], capture_output=True, text=True, timeout=timeout, ) finally: try: os.unlink(tmp.name) except OSError: pass
[docs] def _run_deseq2(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Run DESeq2 differential expression analysis.""" counts_file = arguments.get("counts_file", "") metadata_file = arguments.get("metadata_file", "") design = arguments.get("design", "~ condition") contrast = arguments.get("contrast", "") ref_level = arguments.get("ref_level", "") alpha = arguments.get("alpha", 0.05) lfc_threshold = arguments.get("lfc_threshold", 0) lfc_shrinkage = arguments.get("lfc_shrinkage", False) refit_cooks = arguments.get("refit_cooks", False) if not counts_file or not os.path.exists(counts_file): return {"status": "error", "error": f"Counts file not found: {counts_file}"} if not metadata_file or not os.path.exists(metadata_file): return { "status": "error", "error": f"Metadata file not found: {metadata_file}", } # Build R script r_script = f""" suppressMessages({{ library(DESeq2) }}) counts <- read.csv("{counts_file}", row.names=1, check.names=FALSE) meta <- read.csv("{metadata_file}") # Try to match sample IDs if ("AzentaName" %in% colnames(meta)) {{ rownames(meta) <- meta$AzentaName }} else if ("sample" %in% colnames(meta)) {{ rownames(meta) <- meta$sample }} else {{ rownames(meta) <- meta[,1] }} common <- intersect(colnames(counts), rownames(meta)) if (length(common) == 0) {{ # Try matching without the first column for (col in colnames(meta)) {{ if (length(intersect(as.character(meta[[col]]), colnames(counts))) > 2) {{ rownames(meta) <- as.character(meta[[col]]) common <- intersect(colnames(counts), rownames(meta)) break }} }} }} counts <- counts[, common] meta <- meta[common, ] # Factorize design variables for (v in all.vars(as.formula("{design}"))) {{ if (v %in% colnames(meta)) {{ meta[[v]] <- factor(meta[[v]]) }} }} """ if ref_level: parts = ref_level.split(",") if len(parts) == 2: r_script += f'\nmeta${parts[0].strip()} <- relevel(meta${parts[0].strip()}, ref="{parts[1].strip()}")\n' r_script += f""" dds <- DESeqDataSetFromMatrix(round(as.matrix(counts)), meta, {design}) dds <- DESeq(dds{", quiet=TRUE" if not refit_cooks else ""}) """ if refit_cooks: r_script += "dds <- DESeq(dds, quiet=TRUE)\n" if contrast: parts = [p.strip().strip('"').strip("'") for p in contrast.split(",")] if len(parts) == 3: r_script += f'res <- results(dds, contrast=c("{parts[0]}", "{parts[1]}", "{parts[2]}"), alpha={alpha})\n' else: r_script += f"res <- results(dds, alpha={alpha})\n" else: r_script += f"res <- results(dds, alpha={alpha})\n" if lfc_shrinkage: r_script += """ res_shrunk <- lfcShrink(dds, coef=resultsNames(dds)[2], type="apeglm", quiet=TRUE) res$log2FoldChange_shrunk <- res_shrunk$log2FoldChange """ r_script += f""" # Summary sig <- res[!is.na(res$padj) & res$padj < {alpha}, ] cat("RESULT_JSON_START\\n") result <- list( total_genes = nrow(res), sig_genes = nrow(sig), sig_up = sum(sig$log2FoldChange > 0, na.rm=TRUE), sig_down = sum(sig$log2FoldChange < 0, na.rm=TRUE), dispersion_below_1e5 = sum(mcols(dds)$dispGeneEst < 1e-05, na.rm=TRUE) ) """ if lfc_threshold > 0: r_script += f""" sig_lfc <- res[!is.na(res$padj) & res$padj < {alpha} & abs(res$log2FoldChange) > {lfc_threshold}, ] result$sig_with_lfc_filter <- nrow(sig_lfc) """ r_script += """ cat(jsonlite::toJSON(result, auto_unbox=TRUE)) cat("\\nRESULT_JSON_END\\n") # Save full results out_file <- file.path(tempdir(), "deseq2_results.csv") write.csv(as.data.frame(res), out_file) cat("RESULTS_FILE:", out_file, "\\n") """ try: r = self._run_rscript(r_script) if r.returncode != 0: return { "status": "error", "error": f"R DESeq2 failed: {r.stderr[:500]}", } # Parse JSON from output output = r.stdout json_match = output.split("RESULT_JSON_START\n") if len(json_match) > 1: json_str = json_match[1].split("\nRESULT_JSON_END")[0] result_data = json.loads(json_str) else: result_data = {"raw_output": output[:1000]} # Get results file path results_file = "" for line in output.split("\n"): if line.startswith("RESULTS_FILE:"): results_file = line.split(":", 1)[1].strip() result_data["results_file"] = results_file return {"status": "success", "data": result_data} except subprocess.TimeoutExpired: return {"status": "error", "error": "DESeq2 timed out after 300s"} except Exception as e: return {"status": "error", "error": str(e)[:500]}
[docs] def _run_enrichgo(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Run clusterProfiler enrichGO + simplify.""" gene_list_file = arguments.get("gene_list_file", "") background_file = arguments.get("background_file", "") ont = arguments.get("ontology", "BP") simplify_cutoff = arguments.get("simplify_cutoff", 0.7) id_type = arguments.get("id_type", "ENSEMBL") organism = arguments.get("organism", "org.Hs.eg.db") if not gene_list_file or not os.path.exists(gene_list_file): return { "status": "error", "error": f"Gene list file not found: {gene_list_file}", } r_script = f""" suppressMessages({{ library(clusterProfiler) library({organism}) }}) genes <- readLines("{gene_list_file}") genes <- sub("\\\\..*", "", genes) # Strip Ensembl version gene_entrez <- bitr(genes, fromType="{id_type}", toType="ENTREZID", OrgDb={organism}) """ if background_file and os.path.exists(background_file): r_script += f""" bg <- readLines("{background_file}") bg <- sub("\\\\..*", "", bg) bg_entrez <- bitr(bg, fromType="{id_type}", toType="ENTREZID", OrgDb={organism}) universe_ids <- bg_entrez$ENTREZID """ else: r_script += "universe_ids <- NULL\n" r_script += f""" ego <- enrichGO( gene = gene_entrez$ENTREZID, universe = universe_ids, OrgDb = {organism}, ont = "{ont}", pAdjustMethod = "BH", pvalueCutoff = 0.05, qvalueCutoff = 0.2, readable = TRUE ) cat("enrichGO terms:", nrow(ego), "\\n") ego2 <- clusterProfiler::simplify(ego, cutoff={simplify_cutoff}, by="p.adjust", select_fun=min) cat("Simplified terms:", nrow(ego2), "\\n") df <- as.data.frame(ego2) cat("RESULT_JSON_START\\n") # Top 20 terms top <- head(df[, c("ID","Description","pvalue","p.adjust","qvalue","Count")], 20) cat(jsonlite::toJSON(top, auto_unbox=TRUE)) cat("\\nRESULT_JSON_END\\n") # Save full results out_file <- file.path(tempdir(), "enrichgo_simplified.csv") write.csv(df, out_file, row.names=FALSE) cat("RESULTS_FILE:", out_file, "\\n") """ try: r = self._run_rscript(r_script) if r.returncode != 0: return { "status": "error", "error": f"enrichGO failed: {r.stderr[:500]}", } output = r.stdout json_match = output.split("RESULT_JSON_START\n") if len(json_match) > 1: json_str = json_match[1].split("\nRESULT_JSON_END")[0] terms = json.loads(json_str) else: terms = [] results_file = "" for line in output.split("\n"): if line.startswith("RESULTS_FILE:"): results_file = line.split(":", 1)[1].strip() return { "status": "success", "data": { "n_terms_before_simplify": int( next( ( ln.split(":")[1].strip() for ln in output.split("\n") if "enrichGO terms:" in ln ), "0", ) ), "n_terms_after_simplify": int( next( ( ln2.split(":")[1].strip() for ln2 in output.split("\n") if "Simplified terms:" in ln2 ), "0", ) ), "top_terms": terms, "results_file": results_file, }, } except subprocess.TimeoutExpired: return {"status": "error", "error": "enrichGO timed out after 300s"} except Exception as e: return {"status": "error", "error": str(e)[:500]}