tooluniverse.mavedb_tool 源代码

"""
MaveDB Tool - Multiplexed Assay of Variant Effect Database

MaveDB stores and distributes results from Multiplexed Assays of Variant Effect
(MAVEs), including deep mutational scanning experiments. Score sets contain
functional impact scores for thousands of variants in a single protein/gene.

API: https://api.mavedb.org/api/v1/
Reference: Esposito et al. (2019) Genome Research
"""

import csv
import io

import requests
from typing import Dict, Any
from .base_tool import BaseTool
from .tool_registry import register_tool

MAVEDB_API = "https://api.mavedb.org/api/v1"


[文档] @register_tool("MaveDBTool") class MaveDBTool(BaseTool): """Search MaveDB for variant effect score sets and retrieve details."""
[文档] def __init__(self, tool_config): super().__init__(tool_config) self.parameter = tool_config.get("parameter", {}) self.required = self.parameter.get("required", []) self.session = requests.Session() self.session.headers.update({"Accept": "application/json"})
[文档] def _get_score_set(self, params: Dict[str, Any]) -> Dict[str, Any]: """Get detailed information about a specific score set by URN.""" urn = params.get("urn", "") if not urn: return {"status": "error", "error": "urn parameter is required"} resp = self.session.get(f"{MAVEDB_API}/score-sets/{urn}", timeout=30) if resp.status_code == 404: return { "status": "error", "error": f"Score set '{urn}' not found in MaveDB", } if resp.status_code != 200: return { "status": "error", "error": f"MaveDB request failed: HTTP {resp.status_code}", } d = resp.json() target_genes = [] for tg in d.get("targetGenes") or []: gene_info = { "name": tg.get("name"), "category": tg.get("category"), } ref = tg.get("targetSequence") or {} gene_info["uniprot_id"] = ( ref.get("uniprot", {}).get("identifier") if ref.get("uniprot") else None ) target_genes.append(gene_info) result = { "urn": d.get("urn"), "title": d.get("title"), "short_description": d.get("shortDescription"), "abstract": d.get("abstractText"), "method": d.get("methodText"), "num_variants": d.get("numVariants"), "published_date": d.get("publishedDate"), "license": ( d.get("license", {}).get("shortName") if d.get("license") else None ), "target_genes": target_genes, "doi_identifiers": [ doi.get("identifier") for doi in d.get("doiIdentifiers") or [] ], "primary_publications": [ pub.get("identifier") for pub in d.get("primaryPublicationIdentifiers") or [] ], } return { "status": "success", "data": result, "metadata": {"urn": urn}, }
[文档] def _get_variant_scores(self, params: Dict[str, Any]) -> Dict[str, Any]: """Get variant functional scores from a score set (CSV endpoint).""" urn = params.get("urn", "") if not urn: return {"status": "error", "error": "urn parameter is required"} hgvs_filter = (params.get("hgvs_pro") or "").strip() limit = min(int(params.get("limit", 50)), 500) try: resp = self.session.get( f"{MAVEDB_API}/score-sets/{urn}/scores", timeout=60, headers={"Accept": "text/csv"}, ) except requests.exceptions.RequestException as e: return {"status": "error", "error": f"MaveDB API error: {e}"} if resp.status_code == 404: return { "status": "error", "error": f"Score set '{urn}' not found in MaveDB", } if resp.status_code != 200: return { "status": "error", "error": f"MaveDB request failed: HTTP {resp.status_code}", } csv_text = resp.text if not csv_text.strip(): return { "status": "error", "error": f"No scores available for '{urn}'", } reader = csv.DictReader(io.StringIO(csv_text)) variants = [] total_parsed = 0 for row in reader: total_parsed += 1 if hgvs_filter: hgvs_pro_val = row.get("hgvs_pro") or "" if hgvs_filter.lower() not in hgvs_pro_val.lower(): continue variant = { "hgvs_nt": row.get("hgvs_nt") or None, "hgvs_splice": row.get("hgvs_splice") or None, "hgvs_pro": row.get("hgvs_pro") or None, } for key, value in row.items(): if key in ("accession", "hgvs_nt", "hgvs_splice", "hgvs_pro"): continue if value and value != "NA": try: variant[key] = float(value) except ValueError: variant[key] = value variants.append(variant) if len(variants) >= limit: break return { "status": "success", "data": { "urn": urn, "total_variants_in_set": ( total_parsed if len(variants) < limit else f">{total_parsed} (truncated at {limit})" ), "returned": len(variants), "hgvs_filter": hgvs_filter or None, "variants": variants, }, }
[文档] def _search_experiments(self, params: Dict[str, Any]) -> Dict[str, Any]: """Search MaveDB experiments by text query.""" query = params.get("query", "") if not query: return {"status": "error", "error": "query parameter is required"} try: resp = self.session.post( f"{MAVEDB_API}/experiments/search", json={"text": query}, timeout=30, ) except requests.exceptions.RequestException as e: return {"status": "error", "error": f"MaveDB API error: {e}"} if resp.status_code != 200: return { "status": "error", "error": f"MaveDB search failed: HTTP {resp.status_code}", } raw = resp.json() if not isinstance(raw, list): raw = [raw] if raw else [] experiments = [] for exp in raw: urn = exp.get("urn", "") if urn.startswith("tmp:"): continue pubs = [ { "identifier": pub.get("identifier"), "db_name": pub.get("dbName"), "title": pub.get("title"), } for pub in exp.get("primaryPublicationIdentifiers") or [] ] score_set_urns = exp.get("scoreSetUrns") or [] experiments.append( { "urn": urn, "title": exp.get("title"), "short_description": exp.get("shortDescription"), "published_date": exp.get("publishedDate"), "score_set_urns": score_set_urns, "num_score_sets": len(score_set_urns), "publications": pubs, } ) return { "status": "success", "data": { "query": query, "total_experiments": len(experiments), "experiments": experiments, }, }
[文档] def run(self, params: Dict[str, Any]) -> Dict[str, Any]: operation = self.tool_config.get("fields", {}).get("operation", "") dispatch = { "search": self._search, "get_score_set": self._get_score_set, "get_variant_scores": self._get_variant_scores, "search_experiments": self._search_experiments, } handler = dispatch.get(operation) if handler: return handler(params) return {"status": "error", "error": f"Unknown operation: {operation}"}