Source code for tooluniverse.evo2_variant_effect_tool

"""NVIDIA Evo 2 zero-shot variant-effect scoring (hosted NIM forward endpoint).

Evo 2 (Arc Institute; Brixi et al., 2025) is a genome foundation model with a
1 Mb context. NVIDIA hosts it as a NIM; the ``generate`` endpoint (autoregressive
sequence generation) is already wrapped as ``NvidiaNIM_evo2``. This tool adds the
genomics-relevant operation that endpoint does not cover: **zero-shot
variant-effect scoring** via the model's ``forward`` endpoint.

Method (NVIDIA's documented zero-shot recipe, e.g. the BRCA1 example): build a
DNA window for the reference and the alternate allele, run a forward pass on
each to obtain the model's logits, reduce them to an autoregressive sequence
log-likelihood, and report the delta::

    delta_loglik = loglik(alt) - loglik(ref)

A **negative** delta means the variant makes the sequence less likely under the
genome model — a candidate deleterious/disruptive change; near-zero means
tolerated.

The hosted ``/forward`` endpoint (a StripedHyena model, served in two sizes —
``arc/evo2-40b`` default and ``arc/evo2-7b``, selectable via the ``model`` arg)
returns the requested layer tensors as a base64-encoded NumPy ``.npz``. The final
logits are the ``unembed`` layer (npz key ``unembed.output``, shape
``[batch, seq_len, 512]`` over Evo 2's byte-level vocabulary). This tool decodes
that, computes the likelihood (byte-level tokens, token = ``ord(base)``), and
takes the delta. ``run()`` is key-gated (``NVIDIA_API_KEY``) and never raises.

Note: the forward/scoring path requires a live key to validate end-to-end; the
likelihood reduction is unit-tested independently against synthetic logits.

API: https://docs.nvidia.com/nim/bionemo/evo2/latest/endpoints.html
"""

import base64
import io
import json
import os
import zipfile
from typing import Any, Dict, Optional, Tuple

import numpy as np
import requests

from .base_tool import BaseTool
from .tool_registry import register_tool

_ARC_BASE = "https://health.api.nvidia.com/v1/biology/arc"
_DEFAULT_MODEL = "evo2-40b"
_VALID_MODELS = {"evo2-40b", "evo2-7b"}
_VALID_BASES = set("ACGTN")


[docs] @register_tool("Evo2VariantEffectTool") class Evo2VariantEffectTool(BaseTool): """Score a variant with Evo 2's forward-pass delta log-likelihood (hosted NIM)."""
[docs] def __init__(self, tool_config: Optional[Dict[str, Any]] = None): super().__init__(tool_config) self.tool_config = tool_config or {} fields = self.tool_config.get("fields", {}) or {} # Base path up to (but not including) the model slug; the model is chosen # per call so one tool can score with either hosted Evo 2 size. self.arc_base = fields.get("base_url", _ARC_BASE).rstrip("/") self.timeout = int(fields.get("timeout", 120))
[docs] @staticmethod def _resolve_model(model: Any) -> str: """Pick a valid hosted Evo 2 model, defaulting to evo2-40b.""" candidate = str(model or _DEFAULT_MODEL).strip() return candidate if candidate in _VALID_MODELS else _DEFAULT_MODEL
# ------------------------------------------------------------------ run
[docs] def run(self, arguments: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: args = arguments or {} api_key = os.environ.get("NVIDIA_API_KEY") if not api_key: return self._err( "NVIDIA_API_KEY not set (free key at https://build.nvidia.com)." ) ref_seq, alt_seq, info = self._resolve_sequences(args) if info is not None: # error dict return info model = self._resolve_model(args.get("model")) ll_ref = self._sequence_log_likelihood(ref_seq, api_key, model) if isinstance(ll_ref, dict): return ll_ref ll_alt = self._sequence_log_likelihood(alt_seq, api_key, model) if isinstance(ll_alt, dict): return ll_alt delta = ll_alt - ll_ref return { "status": "success", "data": { "delta_loglik": delta, "ref_loglik": ll_ref, "alt_loglik": ll_alt, "direction": ( "variant disfavored vs reference (candidate deleterious)" if delta < 0 else "variant tolerated or favored (likely neutral)" ), }, "metadata": { "model": f"Evo 2 (arc/{model})", "method": "forward-pass delta log-likelihood (zero-shot)", "source": "NVIDIA NIM (hosted; requires NVIDIA_API_KEY)", "note": ( "Negative delta = variant less likely under the genome model. " "Not a calibrated pathogenicity probability; rank or calibrate " "against a reference set." ), }, }
# -------------------------------------------------------------- inputs
[docs] def _resolve_sequences( self, args: Dict[str, Any] ) -> Tuple[str, str, Optional[Dict[str, Any]]]: """Return (ref_seq, alt_seq, error_or_None). Two input styles: * ref_sequence + alt_sequence (explicit windows), or * sequence + position + alternate (point substitution at 1-based pos). """ ref = self._clean(args.get("ref_sequence")) alt = self._clean(args.get("alt_sequence")) if ref and alt: if len(ref) != len(alt): return ( "", "", self._err( "ref_sequence and alt_sequence must have the same length." ), ) return ref, alt, None seq = self._clean(args.get("sequence")) if seq and args.get("position") is not None and args.get("alternate"): try: pos = int(args["position"]) except (TypeError, ValueError): return "", "", self._err("position must be a 1-based integer.") if not 1 <= pos <= len(seq): return ( "", "", self._err( f"position {pos} out of range for sequence length {len(seq)}." ), ) allele = self._clean(args.get("alternate")) if len(allele) != 1: return ( "", "", self._err("alternate must be a single base for this mode."), ) declared = args.get("reference") if declared and self._clean(declared) != seq[pos - 1]: return ( "", "", self._err( f"reference {declared!r} does not match base {seq[pos - 1]!r} " f"at position {pos}." ), ) alt_seq = seq[: pos - 1] + allele + seq[pos:] return seq, alt_seq, None return ( "", "", self._err( "Provide either ref_sequence + alt_sequence, or sequence + position + " "alternate." ), )
[docs] @staticmethod def _clean(value: Any) -> str: """Strip whitespace + uppercase; return '' if it is not a DNA string.""" s = "".join(str(value or "").split()).upper() return s if s and set(s) <= _VALID_BASES else ""
# ------------------------------------------------------------- scoring
[docs] def _sequence_log_likelihood(self, seq: str, api_key: str, model: str): """Forward pass -> autoregressive log-likelihood (or an error dict).""" logits = self._forward(seq, api_key, model) if isinstance(logits, dict): return logits try: return self._autoregressive_loglik(seq, logits) except Exception as exc: # defensive: malformed logits shape return self._err(f"Could not compute likelihood from Evo 2 logits: {exc}")
[docs] @staticmethod def _autoregressive_loglik(seq: str, logits: np.ndarray) -> float: """Sum of log P(next base) under the model. logits[i] predicts base i+1. Evo 2 is byte-level: the vocabulary index of a base is ``ord(base)``. """ # NIM returns (batch, seq_len, vocab); take batch 0 -> [L, vocab]. arr = logits[0] if logits.ndim == 3 else logits n = min(len(seq), arr.shape[0]) if n < 2: return 0.0 arr = arr[: n - 1] # positions 0..n-2 predict bases 1..n-1 m = arr.max(axis=1, keepdims=True) log_z = m[:, 0] + np.log(np.exp(arr - m).sum(axis=1)) next_tokens = np.frombuffer(seq[1:n].encode("ascii"), dtype=np.uint8) chosen = arr[np.arange(n - 1), next_tokens] return float(np.sum(chosen - log_z))
[docs] def _forward(self, seq: str, api_key: str, model: str): """POST to the Evo 2 forward endpoint and return the logits array (or error).""" url = f"{self.arc_base}/{model}/forward" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", } # The hosted Evo 2 NIMs (both evo2-40b and evo2-7b) are StripedHyena # models: the final logits layer is 'unembed' (returns 'unembed.output', # shape (batch, L, 512)). ('output_layer' is the BioNeMo/Megatron name and # 422s on this endpoint.) payload = {"sequence": seq, "output_layers": ["unembed"]} try: resp = requests.post( url, headers=headers, json=payload, timeout=self.timeout ) except requests.exceptions.Timeout: return self._err(f"Evo 2 request timed out after {self.timeout}s.") except requests.exceptions.RequestException as exc: return self._err(f"Evo 2 request failed: {exc}") if resp.status_code != 200: return self._err(f"Evo 2 HTTP {resp.status_code}: {resp.text[:200]}") try: decoded = self._decode_response(resp) blob = base64.b64decode(decoded["data"]) arrays = np.load(io.BytesIO(blob)) return arrays["unembed.output"] except Exception as exc: return self._err(f"Could not parse Evo 2 response: {exc}")
[docs] @staticmethod def _decode_response(resp) -> Dict[str, Any]: """The NVCF gateway returns inline JSON, or a zip for large payloads.""" if "zip" in resp.headers.get("content-type", "").lower(): with zipfile.ZipFile(io.BytesIO(resp.content)) as zf: return json.loads(zf.read(zf.namelist()[0])) return resp.json()
[docs] @staticmethod def _err(message: str) -> Dict[str, Any]: return {"status": "error", "error": message, "source": "Evo2VariantEffectTool"}