Source code for tooluniverse.phykit_tool

"""PhyKIT phylogenetics analysis tool.

Wraps PhyKIT command-line functions (treeness, saturation, dvmc,
long_branch_score, total_tree_length, parsimony_informative) for
single files or batch processing on directories.
"""

import os
import re
import subprocess
from pathlib import Path
from typing import Any, Dict, List

from .base_tool import BaseTool
from .tool_registry import register_tool


# Some user-facing metric names don't match phykit's CLI subcommand names.
# `parsimony_informative` is the canonical scientific name but phykit exposes
# it as `parsimony_informative_sites` (alias `pis`). Calling the wrong name
# returns the help banner with non-zero exit, silently producing zero values.
PHYKIT_CLI_ALIAS = {
    "parsimony_informative": "parsimony_informative_sites",
}


def _run_phykit(function: str, filepath: str, extra_args: list = None) -> str | None:
    """Run a single phykit command."""
    cli = PHYKIT_CLI_ALIAS.get(function, function)
    cmd = ["phykit", cli, filepath]
    if extra_args:
        cmd.extend(extra_args)
    try:
        r = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
        if r.returncode == 0:
            return r.stdout.strip()
        return None
    except (subprocess.TimeoutExpired, FileNotFoundError):
        return None


[docs] @register_tool("PhyKITTool") class PhyKITTool(BaseTool): """Run PhyKIT phylogenetics functions on tree/alignment files.""" SUPPORTED_FUNCTIONS = [ "treeness", "saturation", "dvmc", "long_branch_score", "total_tree_length", "parsimony_informative", ]
[docs] def __init__(self, tool_config: Dict[str, Any], **kwargs): super().__init__(tool_config)
[docs] def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]: operation = arguments.get("operation", "batch") function = arguments.get("function", "") if function not in self.SUPPORTED_FUNCTIONS: return { "status": "error", "error": f"Unsupported function: {function}. Use one of {self.SUPPORTED_FUNCTIONS}", } if operation == "single": return self._run_single(arguments) elif operation == "batch": return self._run_batch(arguments) elif operation == "gap_percentage": return self._gap_percentage(arguments) else: return {"status": "error", "error": f"Unknown operation: {operation}"}
[docs] def _run_single(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Run PhyKIT on a single file.""" function = arguments["function"] filepath = arguments.get("file", "") if not filepath or not os.path.exists(filepath): return {"status": "error", "error": f"File not found: {filepath}"} extra = [] if function == "saturation": tree_file = arguments.get("tree_file", "") if tree_file: extra = ["-t", tree_file] output = _run_phykit(function, filepath, extra) if output is None: return { "status": "error", "error": f"PhyKIT {function} failed on {filepath}", } return { "status": "success", "data": {"function": function, "file": filepath, "output": output}, }
[docs] def _run_batch(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Run PhyKIT on all files in a directory.""" function = arguments["function"] directory = arguments.get("directory", "") ext = arguments.get("extension", ".treefile") tree_dir = arguments.get("tree_directory", "") tree_ext = arguments.get("tree_extension", ".treefile") per_tree_stat = arguments.get("per_tree_stat", "mean") if not directory or not os.path.isdir(directory): return {"status": "error", "error": f"Directory not found: {directory}"} files = sorted(Path(directory).glob(f"*{ext}")) if not files: return { "status": "error", "error": f"No files matching *{ext} in {directory}", } values: List[float] = [] processed = 0 errors = 0 for f in files: extra = [] if function == "saturation" and tree_dir: tree_path = str(Path(tree_dir) / f"{f.stem}{tree_ext}") if not os.path.exists(tree_path): continue extra = ["-t", tree_path] output = _run_phykit(function, str(f), extra) if output is None: errors += 1 continue processed += 1 if function == "long_branch_score": lines = [ln for ln in output.split("\n") if ln.strip()] taxon_values = [] for line in lines: parts = line.split("\t") try: taxon_values.append(float(parts[-1])) except (ValueError, IndexError): pass if not taxon_values: continue if per_tree_stat == "mean": values.append(sum(taxon_values) / len(taxon_values)) elif per_tree_stat == "median": taxon_values.sort() n = len(taxon_values) values.append( taxon_values[n // 2] if n % 2 else (taxon_values[n // 2 - 1] + taxon_values[n // 2]) / 2 ) else: values.extend(taxon_values) else: try: parts = output.split("\n")[0].split("\t") # Saturation outputs slope<TAB>1-slope; use 1-slope (col 1) if function == "saturation" and len(parts) >= 2: val = float(parts[1]) elif function == "parsimony_informative" and len(parts) >= 3: # phykit pis output: n_pi <TAB> n_total <TAB> percent. # The "%PIS" column (col 3) is what scientific # questions ask for, not the raw count. val = float(parts[2]) else: val = float(parts[0]) values.append(val) except (ValueError, IndexError): errors += 1 if not values: return { "status": "error", "error": f"No values computed from {processed} files ({errors} errors)", } values.sort() n = len(values) mean_val = sum(values) / n median_val = ( values[n // 2] if n % 2 else (values[n // 2 - 1] + values[n // 2]) / 2 ) return { "status": "success", "data": { "function": function, "n_files": processed, "n_errors": errors, "n_values": n, "mean": round(mean_val, 6), "median": round(median_val, 6), "min": round(min(values), 6), "max": round(max(values), 6), "values": [round(v, 6) for v in values] if n <= 50 else None, }, }
[docs] def _gap_percentage(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Compute gap percentage across all alignments in a directory.""" directory = arguments.get("directory", "") ext = arguments.get("extension", ".fa") if not directory or not os.path.isdir(directory): return {"status": "error", "error": f"Directory not found: {directory}"} total_gaps = 0 total_positions = 0 files_processed = 0 for f in sorted(Path(directory).glob(f"*{ext}")): with open(f) as fh: seqs: List[str] = [] current: List[str] = [] for line in fh: line = line.strip() if line.startswith(">"): if current: seqs.append("".join(current)) current = [] else: current.append(line) if current: seqs.append("".join(current)) for seq in seqs: total_positions += len(seq) total_gaps += seq.count("-") files_processed += 1 if total_positions == 0: return {"status": "error", "error": "No alignment data found"} pct = total_gaps / total_positions * 100 return { "status": "success", "data": { "total_gaps": total_gaps, "total_positions": total_positions, "gap_percentage": round(pct, 2), "files_processed": files_processed, }, }