Source code for tooluniverse.mavedb_tool
"""
MaveDB Tool - Multiplexed Assay of Variant Effect Database
MaveDB stores and distributes results from Multiplexed Assays of Variant Effect
(MAVEs), including deep mutational scanning experiments. Score sets contain
functional impact scores for thousands of variants in a single protein/gene.
API: https://api.mavedb.org/api/v1/
Reference: Esposito et al. (2019) Genome Research
"""
import csv
import io
import re
from collections import Counter
import requests
from typing import Dict, Any, List, Optional, Tuple
from .base_tool import BaseTool
from .tool_registry import register_tool
MAVEDB_API = "https://api.mavedb.org/api/v1"
UNIPROT_API = "https://rest.uniprot.org/uniprotkb"
# Standard 20-amino-acid order (matches what every downstream skill expects)
_STANDARD_AAS = "ACDEFGHIKLMNPQRSTVWY"
_THREE_TO_ONE: Dict[str, str] = {
"Ala": "A",
"Arg": "R",
"Asn": "N",
"Asp": "D",
"Cys": "C",
"Gln": "Q",
"Glu": "E",
"Gly": "G",
"His": "H",
"Ile": "I",
"Leu": "L",
"Lys": "K",
"Met": "M",
"Phe": "F",
"Pro": "P",
"Ser": "S",
"Thr": "T",
"Trp": "W",
"Tyr": "Y",
"Val": "V",
}
_HGVS_SINGLE_MISSENSE_RE = re.compile(r"^p\.\(?([A-Z][a-z]{2})(\d+)([A-Z][a-z]{2})\)?$")
[docs]
@register_tool("MaveDBTool")
class MaveDBTool(BaseTool):
"""Search MaveDB for variant effect score sets and retrieve details."""
[docs]
def __init__(self, tool_config):
super().__init__(tool_config)
self.parameter = tool_config.get("parameter", {})
self.required = self.parameter.get("required", [])
self.session = requests.Session()
self.session.headers.update({"Accept": "application/json"})
[docs]
def _search(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Search MaveDB score sets by text query."""
query = params.get("query", "")
limit = params.get("limit", 20)
if not query:
return {"status": "error", "error": "query parameter is required"}
resp = self.session.post(
f"{MAVEDB_API}/score-sets/search",
json={"text": query},
timeout=30,
)
if resp.status_code != 200:
return {
"status": "error",
"error": f"MaveDB search failed: HTTP {resp.status_code}",
}
data = resp.json()
score_sets = data.get("scoreSets", [])[:limit]
results = []
for ss in score_sets:
results.append(
{
"urn": ss.get("urn"),
"title": ss.get("title"),
"short_description": ss.get("shortDescription"),
"num_variants": ss.get("numVariants"),
"published_date": ss.get("publishedDate"),
}
)
return {
"status": "success",
"data": results,
"metadata": {
"total_results": len(score_sets),
"query": query,
},
}
[docs]
def _get_score_set(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Get detailed information about a specific score set by URN."""
urn = params.get("urn", "")
if not urn:
return {"status": "error", "error": "urn parameter is required"}
resp = self.session.get(f"{MAVEDB_API}/score-sets/{urn}", timeout=30)
if resp.status_code == 404:
return {
"status": "error",
"error": f"Score set '{urn}' not found in MaveDB",
}
if resp.status_code != 200:
return {
"status": "error",
"error": f"MaveDB request failed: HTTP {resp.status_code}",
}
d = resp.json()
target_genes = []
for tg in d.get("targetGenes") or []:
gene_info = {
"name": tg.get("name"),
"category": tg.get("category"),
}
ref = tg.get("targetSequence") or {}
gene_info["uniprot_id"] = (
ref.get("uniprot", {}).get("identifier") if ref.get("uniprot") else None
)
target_genes.append(gene_info)
result = {
"urn": d.get("urn"),
"title": d.get("title"),
"short_description": d.get("shortDescription"),
"abstract": d.get("abstractText"),
"method": d.get("methodText"),
"num_variants": d.get("numVariants"),
"published_date": d.get("publishedDate"),
"license": (
d.get("license", {}).get("shortName") if d.get("license") else None
),
"target_genes": target_genes,
"doi_identifiers": [
doi.get("identifier") for doi in d.get("doiIdentifiers") or []
],
"primary_publications": [
pub.get("identifier")
for pub in d.get("primaryPublicationIdentifiers") or []
],
}
return {
"status": "success",
"data": result,
"metadata": {"urn": urn},
}
[docs]
def _get_variant_scores(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Get variant functional scores from a score set (CSV endpoint)."""
urn = params.get("urn", "")
if not urn:
return {"status": "error", "error": "urn parameter is required"}
hgvs_filter = (params.get("hgvs_pro") or "").strip()
# limit semantics: client-side truncation of the CSV returned by the
# /scores endpoint (the endpoint itself returns every variant in one
# call — there is no server-side pagination). limit=None / omitted /
# 0 means "return all", which is what DMS workflows need on whole-
# protein score sets (e.g. KRAS folding ΔΔG has ~2200 variants).
raw_limit = params.get("limit")
limit = None
if raw_limit is not None:
try:
lv = int(raw_limit)
limit = None if lv <= 0 else lv
except (TypeError, ValueError):
limit = None
try:
resp = self.session.get(
f"{MAVEDB_API}/score-sets/{urn}/scores",
timeout=60,
headers={"Accept": "text/csv"},
)
except requests.exceptions.RequestException as e:
return {"status": "error", "error": f"MaveDB API error: {e}"}
if resp.status_code == 404:
return {
"status": "error",
"error": f"Score set '{urn}' not found in MaveDB",
}
if resp.status_code != 200:
return {
"status": "error",
"error": f"MaveDB request failed: HTTP {resp.status_code}",
}
csv_text = resp.text
if not csv_text.strip():
return {
"status": "error",
"error": f"No scores available for '{urn}'",
}
reader = csv.DictReader(io.StringIO(csv_text))
variants = []
total_parsed = 0
for row in reader:
total_parsed += 1
if hgvs_filter:
hgvs_pro_val = row.get("hgvs_pro") or ""
if hgvs_filter.lower() not in hgvs_pro_val.lower():
continue
variant = {
"hgvs_nt": row.get("hgvs_nt") or None,
"hgvs_splice": row.get("hgvs_splice") or None,
"hgvs_pro": row.get("hgvs_pro") or None,
}
for key, value in row.items():
if key in ("accession", "hgvs_nt", "hgvs_splice", "hgvs_pro"):
continue
if value and value != "NA":
try:
variant[key] = float(value)
except ValueError:
variant[key] = value
variants.append(variant)
if limit is not None and len(variants) >= limit:
# Continue counting total_parsed so we can report the true total
# in the response (let the user know more variants exist that
# were truncated client-side).
for _ in reader:
total_parsed += 1
break
truncated = (
limit is not None and len(variants) >= limit and total_parsed > limit
)
return {
"status": "success",
"data": {
"urn": urn,
"total_variants_in_set": total_parsed,
"returned": len(variants),
"truncated": truncated,
"limit_applied": limit,
"hgvs_filter": hgvs_filter or None,
"variants": variants,
},
}
[docs]
def _summarize_vrs_allele(
self, allele: Optional[Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
"""Flatten a GA4GH VRS Allele into the few fields callers actually use.
Pulls the SequenceLocation (genomic coordinates when postMapped) plus
the literal state sequence and any hgvs.g expression. Returns None for a
null / non-dict allele so missing maps don't crash downstream code.
"""
if not isinstance(allele, dict):
return None
loc = allele.get("location") or {}
seq_ref = loc.get("sequenceReference") or {}
state = allele.get("state") or {}
expressions = [
e.get("value")
for e in (allele.get("expressions") or [])
if isinstance(e, dict) and e.get("value")
]
return {
"vrs_id": allele.get("id"),
"sequence_reference_label": seq_ref.get("label"),
"refget_accession": seq_ref.get("refgetAccession"),
"start": loc.get("start"),
"end": loc.get("end"),
"state_sequence": state.get("sequence"),
"hgvs_expressions": expressions or None,
}
[docs]
@staticmethod
def _parse_limit(raw_limit: Any) -> Optional[int]:
"""Coerce a user-supplied ``limit`` to a positive int, else None (no cap)."""
if raw_limit is None:
return None
try:
value = int(raw_limit)
except (TypeError, ValueError):
return None
return None if value <= 0 else value
[docs]
@staticmethod
def _apply_limit(items: list, limit: Optional[int]) -> Tuple[list, bool]:
"""Truncate ``items`` to ``limit`` rows, returning (rows, truncated)."""
if limit is not None and len(items) > limit:
return items[:limit], True
return items, False
[docs]
def _get_mapped_variants(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Get genomically-mapped (GA4GH VRS / ClinGen Allele ID) coordinates.
Returns the list of mapped variants for a MaveDB score set. Each entry
carries a postMapped VRS Allele with genomic SequenceLocation (when the
variant maps to the genome) plus the ClinGen canonical allele ID, which
is what links a MAVE functional score to other variant databases.
"""
urn = (params.get("urn") or "").strip()
if not urn:
return {"status": "error", "error": "urn parameter is required"}
limit = self._parse_limit(params.get("limit"))
try:
resp = self.session.get(
f"{MAVEDB_API}/score-sets/{urn}/mapped-variants", timeout=30
)
except requests.exceptions.RequestException as e:
return {"status": "error", "error": f"MaveDB API error: {e}"}
if resp.status_code == 404:
return {
"status": "error",
"error": f"Score set '{urn}' not found in MaveDB",
}
if resp.status_code != 200:
return {
"status": "error",
"error": f"MaveDB request failed: HTTP {resp.status_code}",
}
try:
raw = resp.json()
except ValueError as e:
return {"status": "error", "error": f"Invalid JSON from MaveDB: {e}"}
if not isinstance(raw, list):
raw = [raw] if raw else []
mapped = []
n_with_genomic = 0
n_with_clingen = 0
for entry in raw:
if not isinstance(entry, dict):
continue
post = self._summarize_vrs_allele(entry.get("postMapped"))
pre = self._summarize_vrs_allele(entry.get("preMapped"))
clingen = entry.get("clingenAlleleId")
if post:
n_with_genomic += 1
if clingen:
n_with_clingen += 1
mapped.append(
{
"variant_urn": entry.get("variantUrn"),
"clingen_allele_id": clingen,
"alignment_level": entry.get("alignmentLevel"),
"vrs_version": entry.get("vrsVersion"),
"current": entry.get("current"),
"error_message": entry.get("errorMessage"),
"post_mapped": post,
"pre_mapped": pre,
}
)
total = len(mapped)
mapped, truncated = self._apply_limit(mapped, limit)
return {
"status": "success",
"data": {
"urn": urn,
"total_mapped_variants": total,
"returned": len(mapped),
"truncated": truncated,
"n_with_genomic_location": n_with_genomic,
"n_with_clingen_allele_id": n_with_clingen,
"mapped_variants": mapped,
},
"metadata": {
"source": "MaveDB mapped-variants (GA4GH VRS + ClinGen Allele Registry)",
"note": (
"post_mapped carries genomic SequenceLocation when the "
"variant maps to the genome (alignment_level='genomic'); "
"intronic / unmappable variants have post_mapped=None and an "
"error_message. clingen_allele_id links scores to ClinVar/dbSNP."
),
},
}
[docs]
def _get_clinical_controls(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Get ClinVar clinical-control variants overlapping a score set.
Returns the ClinVar pathogenic/benign/VUS variants that overlap the
score set's mapped variants — the reference set used to calibrate MAVE
functional scores into clinically-interpretable thresholds.
"""
urn = (params.get("urn") or "").strip()
if not urn:
return {"status": "error", "error": "urn parameter is required"}
sig_filter = (params.get("clinical_significance") or "").strip().lower()
limit = self._parse_limit(params.get("limit"))
try:
resp = self.session.get(
f"{MAVEDB_API}/score-sets/{urn}/clinical-controls", timeout=30
)
except requests.exceptions.RequestException as e:
return {"status": "error", "error": f"MaveDB API error: {e}"}
if resp.status_code == 404:
return {
"status": "error",
"error": f"Score set '{urn}' not found in MaveDB",
}
if resp.status_code != 200:
return {
"status": "error",
"error": f"MaveDB request failed: HTTP {resp.status_code}",
}
try:
raw = resp.json()
except ValueError as e:
return {"status": "error", "error": f"Invalid JSON from MaveDB: {e}"}
if not isinstance(raw, list):
raw = [raw] if raw else []
controls = []
sig_counts: Counter = Counter()
for entry in raw:
if not isinstance(entry, dict):
continue
significance = entry.get("clinicalSignificance")
if sig_filter and (significance or "").lower() != sig_filter:
continue
sig_counts[significance or "Unknown"] += 1
mapped_urns = [
mv.get("variantUrn")
for mv in entry.get("mappedVariants") or []
if isinstance(mv, dict) and mv.get("variantUrn")
]
controls.append(
{
"db_name": entry.get("dbName"),
"db_identifier": entry.get("dbIdentifier"),
"db_version": entry.get("dbVersion"),
"gene_symbol": entry.get("geneSymbol"),
"clinical_significance": significance,
"clinical_review_status": entry.get("clinicalReviewStatus"),
"mapped_variant_urns": mapped_urns,
}
)
total = len(controls)
controls, truncated = self._apply_limit(controls, limit)
return {
"status": "success",
"data": {
"urn": urn,
"total_clinical_controls": total,
"returned": len(controls),
"truncated": truncated,
"significance_filter": sig_filter or None,
"significance_breakdown": dict(sig_counts),
"clinical_controls": controls,
},
"metadata": {
"source": "MaveDB clinical-controls (ClinVar overlap)",
"note": (
"ClinVar pathogenic/benign/VUS variants overlapping the "
"score set, used to calibrate functional scores. Each entry's "
"mapped_variant_urns point to the MaveDB variant(s) with the "
"same genomic coordinate."
),
},
}
[docs]
def _get_gnomad_variants(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Get gnomAD population-frequency variants observed within a score set.
Returns gnomAD variants (allele count / number / frequency) that fall
within the score set, each linked to the MaveDB mapped variant via a
postMapped VRS Allele — lets you cross population frequency with the
measured functional score.
"""
urn = (params.get("urn") or "").strip()
if not urn:
return {"status": "error", "error": "urn parameter is required"}
limit = self._parse_limit(params.get("limit"))
try:
resp = self.session.get(
f"{MAVEDB_API}/score-sets/{urn}/gnomad-variants", timeout=30
)
except requests.exceptions.RequestException as e:
return {"status": "error", "error": f"MaveDB API error: {e}"}
if resp.status_code == 404:
return {
"status": "error",
"error": f"Score set '{urn}' not found in MaveDB",
}
if resp.status_code != 200:
return {
"status": "error",
"error": f"MaveDB request failed: HTTP {resp.status_code}",
}
try:
raw = resp.json()
except ValueError as e:
return {"status": "error", "error": f"Invalid JSON from MaveDB: {e}"}
if not isinstance(raw, list):
raw = [raw] if raw else []
variants = []
for entry in raw:
if not isinstance(entry, dict):
continue
mapped_post = []
for mv in entry.get("mappedVariants") or []:
if isinstance(mv, dict):
summary = self._summarize_vrs_allele(mv.get("postMapped"))
if summary:
mapped_post.append(summary)
variants.append(
{
"db_name": entry.get("dbName"),
"db_identifier": entry.get("dbIdentifier"),
"db_version": entry.get("dbVersion"),
"allele_count": entry.get("alleleCount"),
"allele_number": entry.get("alleleNumber"),
"allele_frequency": entry.get("alleleFrequency"),
"mapped_genomic_alleles": mapped_post,
}
)
total = len(variants)
variants, truncated = self._apply_limit(variants, limit)
return {
"status": "success",
"data": {
"urn": urn,
"total_gnomad_variants": total,
"returned": len(variants),
"truncated": truncated,
"gnomad_variants": variants,
},
"metadata": {
"source": "MaveDB gnomAD-variants (population frequency overlap)",
"note": (
"gnomAD variants observed within the score set. Cross "
"allele_frequency with the MAVE functional score: rare "
"high-impact variants are candidate disease alleles."
),
},
}
[docs]
def _search_experiments(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Search MaveDB experiments by text query."""
query = params.get("query", "")
if not query:
return {"status": "error", "error": "query parameter is required"}
try:
resp = self.session.post(
f"{MAVEDB_API}/experiments/search",
json={"text": query},
timeout=30,
)
except requests.exceptions.RequestException as e:
return {"status": "error", "error": f"MaveDB API error: {e}"}
if resp.status_code != 200:
return {
"status": "error",
"error": f"MaveDB search failed: HTTP {resp.status_code}",
}
raw = resp.json()
if not isinstance(raw, list):
raw = [raw] if raw else []
experiments = []
for exp in raw:
urn = exp.get("urn", "")
if urn.startswith("tmp:"):
continue
pubs = [
{
"identifier": pub.get("identifier"),
"db_name": pub.get("dbName"),
"title": pub.get("title"),
}
for pub in exp.get("primaryPublicationIdentifiers") or []
]
score_set_urns = exp.get("scoreSetUrns") or []
experiments.append(
{
"urn": urn,
"title": exp.get("title"),
"short_description": exp.get("shortDescription"),
"published_date": exp.get("publishedDate"),
"score_set_urns": score_set_urns,
"num_score_sets": len(score_set_urns),
"publications": pubs,
}
)
return {
"status": "success",
"data": {
"query": query,
"total_experiments": len(experiments),
"experiments": experiments,
},
}
[docs]
def _get_effect_matrix(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Fetch DMS score set and return ready-to-analyze effect matrix.
Hides the parsing pipeline that every consuming skill used to inline:
1. Fetch all variants (uses _get_variant_scores with no limit)
2. Parse HGVS protein notation → (ref_aa, position, alt_aa)
3. Filter to single missense (drop synonymous/nonsense/indels/multi)
4. Optional: verify position numbering against UniProt canonical
5. Reshape to (20 amino acids × n_positions) matrix in standard
AA order ('ACDEFGHIKLMNPQRSTVWY')
"""
urn = params.get("urn", "")
if not urn:
return {"status": "error", "error": "urn parameter is required"}
uniprot_id: Optional[str] = params.get("uniprot_id")
score_field: Optional[str] = params.get("score_field")
# 1) Pull all variants via the existing helper (it uses the same session)
raw = self._get_variant_scores({"urn": urn, "limit": 0})
if raw.get("status") != "success":
return raw
variants = raw["data"].get("variants", [])
total_in_set = raw["data"].get("total_variants_in_set", len(variants))
# 2-3) parse HGVS + filter to single missense
parsed: List[Tuple[str, int, str, float]] = []
n_dropped = 0
score_field_counter: Counter = Counter()
for row in variants:
hgvs = row.get("hgvs_pro") or ""
m = _HGVS_SINGLE_MISSENSE_RE.match(hgvs)
if not m:
n_dropped += 1
continue
ref3, pos_str, alt3 = m.group(1), m.group(2), m.group(3)
if ref3 not in _THREE_TO_ONE or alt3 not in _THREE_TO_ONE:
n_dropped += 1
continue
# Pick a score field — explicit > 'score' > 'ddG' > 'fitness' > first numeric
score: Optional[float] = None
if score_field and score_field in row:
try:
score = float(row[score_field])
score_field_counter[score_field] += 1
except (TypeError, ValueError):
pass
else:
for cand in ("score", "ddG", "fitness"):
if row.get(cand) is None:
continue
try:
score = float(row[cand])
score_field_counter[cand] += 1
break
except (TypeError, ValueError):
continue
if score is None:
for k, v in row.items():
if k.startswith("hgvs") or k == "accession":
continue
try:
score = float(v)
score_field_counter[k] += 1
break
except (TypeError, ValueError):
continue
if score is None or (isinstance(score, float) and (score != score)):
n_dropped += 1
continue
parsed.append(
(_THREE_TO_ONE[ref3], int(pos_str), _THREE_TO_ONE[alt3], score)
)
if not parsed:
return {
"status": "error",
"error": (
f"No usable single-missense variants found in '{urn}' "
f"({total_in_set} rows, {n_dropped} dropped — check that the "
f"score set uses HGVS protein notation and has numeric scores)."
),
}
score_field_used = (
score_field_counter.most_common(1)[0][0] if score_field_counter else None
)
# 4) optional numbering verification against UniProt canonical
numbering_offset: Optional[int] = None
landmark_check: Optional[Dict[str, Any]] = None
if uniprot_id:
try:
resp = self.session.get(f"{UNIPROT_API}/{uniprot_id}.fasta", timeout=30)
if resp.status_code == 200:
seq_lines = resp.text.strip().splitlines()
canonical_seq = "".join(seq_lines[1:])
# Use the most-common parsed position as the landmark
position_counts = Counter(p[1] for p in parsed)
landmark_pos = position_counts.most_common(1)[0][0]
landmark_ref = next(p[0] for p in parsed if p[1] == landmark_pos)
seq_idx = landmark_pos - 1
if 0 <= seq_idx < len(canonical_seq):
uniprot_aa = canonical_seq[seq_idx]
if uniprot_aa == landmark_ref:
numbering_offset = 0
landmark_check = {
"position": landmark_pos,
"mavedb_ref": landmark_ref,
"uniprot_aa": uniprot_aa,
"match": True,
}
else:
# Try common offsets to detect the shift
offset_detected = None
for offset in range(-10, 11):
target_idx = seq_idx + offset
if (
0 <= target_idx < len(canonical_seq)
and canonical_seq[target_idx] == landmark_ref
):
offset_detected = offset
break
landmark_check = {
"position": landmark_pos,
"mavedb_ref": landmark_ref,
"uniprot_aa": uniprot_aa,
"match": False,
"detected_offset": offset_detected,
"warning": (
f"MaveDB position {landmark_pos} has ref_aa "
f"{landmark_ref} but UniProt {uniprot_id} "
f"position {landmark_pos} is {uniprot_aa}."
+ (
f" Likely offset = {offset_detected}."
if offset_detected is not None
else ""
)
),
}
numbering_offset = offset_detected
else:
landmark_check = {
"warning": f"UniProt {uniprot_id} lookup HTTP {resp.status_code}"
}
except requests.exceptions.RequestException as exc:
landmark_check = {"warning": f"UniProt verification failed: {exc}"}
# 5) reshape into (20 × n_positions)
positions = sorted({p[1] for p in parsed})
pos_index = {p: i for i, p in enumerate(positions)}
aa_index = {a: i for i, a in enumerate(_STANDARD_AAS)}
matrix: List[List[Optional[float]]] = [
[None] * len(positions) for _ in range(20)
]
for ref_aa, pos, alt_aa, score in parsed:
if alt_aa not in aa_index:
continue
matrix[aa_index[alt_aa]][pos_index[pos]] = score
return {
"status": "success",
"data": {
"urn": urn,
"matrix": matrix,
"positions": positions,
"amino_acid_order": _STANDARD_AAS,
"shape": [20, len(positions)],
"n_parsed_single_missense": len(parsed),
"n_dropped": n_dropped,
"n_total_rows": total_in_set,
"score_field_used": score_field_used,
"numbering_check": landmark_check,
"numbering_offset": numbering_offset,
},
"metadata": {
"source": "MaveDB",
"matrix_layout": (
"rows = 20 amino acids in 'ACDEFGHIKLMNPQRSTVWY' order; "
"columns = positions (1-based canonical, see positions field); "
"cells = score (None for unmeasured / WT-diagonal)"
),
"note": (
"WT diagonal cells are None (the MaveDB API doesn't "
"include 'X→X' rows). Downstream skills should treat None "
"as missing data."
),
},
}
[docs]
def run(self, params: Dict[str, Any]) -> Dict[str, Any]:
operation = self.tool_config.get("fields", {}).get("operation", "")
dispatch = {
"search": self._search,
"get_score_set": self._get_score_set,
"get_variant_scores": self._get_variant_scores,
"search_experiments": self._search_experiments,
"get_effect_matrix": self._get_effect_matrix,
"get_mapped_variants": self._get_mapped_variants,
"get_clinical_controls": self._get_clinical_controls,
"get_gnomad_variants": self._get_gnomad_variants,
}
handler = dispatch.get(operation)
if handler:
return handler(params)
return {"status": "error", "error": f"Unknown operation: {operation}"}