tooluniverse.sequence_analyze_tool 源代码

"""
Sequence Analysis Tool

Residue counting (with live UniProt fetch), GC content, reverse complement,
and basic sequence statistics for DNA/RNA/protein sequences.

Uses UniProt REST API for sequence fetching. No other external dependencies.
"""

import urllib.error
import urllib.request
from typing import Any, Dict
from .base_tool import BaseTool
from .tool_registry import register_tool


COMPLEMENT = str.maketrans("ATCGatcgNn", "TAGCtagcNn")
DNA_BASES = frozenset("ATCGNatcgn")
RNA_BASES = frozenset("AUCGNaucgn")

_AA_MASS = {
    "A": 71.03711,
    "R": 156.10111,
    "N": 114.04293,
    "D": 115.02694,
    "C": 103.00919,
    "E": 129.04259,
    "Q": 128.05858,
    "G": 57.02146,
    "H": 137.05891,
    "I": 113.08406,
    "L": 113.08406,
    "K": 128.09496,
    "M": 131.04049,
    "F": 147.06841,
    "P": 97.05276,
    "S": 87.03203,
    "T": 101.04768,
    "W": 186.07931,
    "Y": 163.06333,
    "V": 99.06841,
}
_WATER_MASS = 18.01056


def _is_dna(seq: str) -> bool:
    return all(c in DNA_BASES for c in seq) and "U" not in seq.upper()


def _is_rna(seq: str) -> bool:
    return all(c in RNA_BASES for c in seq)


def _fetch_uniprot(accession: str) -> str:
    """Fetch protein sequence from UniProt REST API."""
    url = f"https://rest.uniprot.org/uniprotkb/{accession}.fasta"
    try:
        with urllib.request.urlopen(url, timeout=15) as resp:
            fasta = resp.read().decode("utf-8")
    except urllib.error.HTTPError as e:
        raise RuntimeError(
            f"HTTP {e.code} fetching UniProt {accession}: {e.reason}"
        ) from e
    except urllib.error.URLError as e:
        raise RuntimeError(
            f"Network error fetching UniProt {accession}: {e.reason}"
        ) from e

    lines = fasta.strip().splitlines()
    if not lines or not lines[0].startswith(">"):
        raise RuntimeError(f"Unexpected FASTA format for {accession}.")

    seq = "".join(lines[1:]).upper().replace(" ", "")
    if not seq:
        raise RuntimeError(f"Empty sequence returned for {accession}.")
    return seq


[文档] @register_tool("SequenceAnalyzeTool") class SequenceAnalyzeTool(BaseTool): """Sequence analysis: residue counting, GC content, reverse complement, stats."""
[文档] def __init__(self, tool_config: Dict[str, Any]): super().__init__(tool_config) self.parameter = tool_config.get("parameter", {}) self.required = self.parameter.get("required", [])
[文档] def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]: operation = arguments.get("operation") if not operation: return {"status": "error", "error": "Missing required parameter: operation"} handlers = { "count_residues": self._count_residues, "gc_content": self._gc_content, "reverse_complement": self._reverse_complement, "stats": self._stats, } handler = handlers.get(operation) if not handler: return { "status": "error", "error": f"Unknown operation: {operation}", "available_operations": list(handlers.keys()), } try: return handler(arguments) except Exception as e: return {"status": "error", "error": f"Analysis failed: {str(e)}"}
[文档] def _resolve_sequence(self, arguments: Dict[str, Any]) -> str: """Get sequence from arguments, fetching from UniProt if needed.""" seq = arguments.get("sequence") uniprot_id = arguments.get("uniprot_id") if seq: return seq.strip() if uniprot_id: return _fetch_uniprot(uniprot_id.strip()) raise ValueError("Provide either 'sequence' or 'uniprot_id'.")
[文档] def _count_residues(self, arguments: Dict[str, Any]) -> Dict[str, Any]: seq = self._resolve_sequence(arguments) residue = arguments.get("residue") if not seq: return {"status": "error", "error": "Empty sequence."} if not residue or len(residue) != 1: return {"status": "error", "error": "residue must be a single character."} seq_upper = seq.upper() residue_upper = residue.upper() count = seq_upper.count(residue_upper) fraction = count / len(seq_upper) positions = [i + 1 for i, c in enumerate(seq_upper) if c == residue_upper] data = { "sequence_length": len(seq_upper), "residue": residue_upper, "count": count, "fraction": round(fraction, 4), "percent": round(fraction * 100, 2), "positions_1based": positions[:50], "total_positions": len(positions), } if arguments.get("uniprot_id"): data["uniprot_id"] = arguments["uniprot_id"] data["source"] = "UniProt REST API" return {"status": "success", "data": data}
[文档] def _gc_content(self, arguments: Dict[str, Any]) -> Dict[str, Any]: seq = self._resolve_sequence(arguments).upper() if not seq: return {"status": "error", "error": "Empty sequence."} counts = {b: seq.count(b) for b in "ATCGUN"} gc = counts["G"] + counts["C"] n = counts["N"] total = len(seq) if total - n == 0: return {"status": "error", "error": "Sequence contains only N bases."} gc_frac = gc / (total - n) return { "status": "success", "data": { "length": total, "gc_count": gc, "gc_fraction": round(gc_frac, 4), "gc_percent": round(gc_frac * 100, 2), "composition": {b: counts[b] for b in "ATCGUN" if counts[b] > 0}, }, }
[文档] def _reverse_complement(self, arguments: Dict[str, Any]) -> Dict[str, Any]: seq = self._resolve_sequence(arguments).strip() if not seq: return {"status": "error", "error": "Empty sequence."} if not _is_dna(seq): return { "status": "error", "error": "Sequence contains non-DNA characters. Only A/T/C/G/N supported.", } rc = seq.translate(COMPLEMENT)[::-1] return { "status": "success", "data": { "original": seq.upper(), "reverse_complement": rc.upper(), "length": len(seq), }, }
[文档] def _stats(self, arguments: Dict[str, Any]) -> Dict[str, Any]: seq = self._resolve_sequence(arguments).strip().upper() if not seq: return {"status": "error", "error": "Empty sequence."} length = len(seq) composition = {c: seq.count(c) for c in sorted(set(seq))} if _is_dna(seq): seq_type = "DNA" gc = composition.get("G", 0) + composition.get("C", 0) n = composition.get("N", 0) gc_pct = gc / (length - n) * 100 if length - n > 0 else 0 extra = {"gc_percent": round(gc_pct, 2)} elif _is_rna(seq): seq_type = "RNA" gc = composition.get("G", 0) + composition.get("C", 0) n = composition.get("N", 0) gc_pct = gc / (length - n) * 100 if length - n > 0 else 0 extra = {"gc_percent": round(gc_pct, 2)} else: seq_type = "Protein" mw = _WATER_MASS + sum(_AA_MASS.get(aa, 111.1) for aa in seq) extra = {"estimated_mw_da": round(mw, 2)} data = { "sequence_type": seq_type, "length": length, "composition": composition, **extra, } if arguments.get("uniprot_id"): data["uniprot_id"] = arguments["uniprot_id"] return {"status": "success", "data": data}