tooluniverse.popgen_tool 源代码
"""
Population Genetics Calculator Tool
Computes Hardy-Weinberg equilibrium tests, Fst (Weir-Cockerham),
inbreeding coefficients, and haplotype diversity estimates.
No external API calls. Pure Python with stdlib math only.
"""
import math
from typing import Any, Dict
from .base_tool import BaseTool
from .tool_registry import register_tool
# ---------------------------------------------------------------------------
# Chi-square p-value (regularized incomplete gamma function)
# ---------------------------------------------------------------------------
def _chi2_pvalue(x: float, df: int = 1) -> float:
"""Approximate p-value for chi-square distribution."""
if x <= 0:
return 1.0
if df == 1:
return math.erfc(math.sqrt(x / 2.0))
return _upper_inc_gamma_reg(df / 2.0, x / 2.0)
def _upper_inc_gamma_reg(a: float, x: float) -> float:
"""Q(a, x) = 1 - P(a, x)."""
if x <= 0:
return 1.0
if x < a + 1.0:
return 1.0 - _gamma_series(a, x)
return _gamma_cf(a, x)
def _gamma_series(a: float, x: float, max_iter: int = 200, eps: float = 1e-9) -> float:
"""Series representation of regularized lower incomplete gamma P(a,x)."""
lg = math.lgamma(a)
ap = a
term = 1.0 / a
total = term
for _ in range(max_iter):
ap += 1.0
term *= x / ap
total += term
if abs(term) < abs(total) * eps:
break
return total * math.exp(-x + a * math.log(x) - lg)
def _gamma_cf(a: float, x: float, max_iter: int = 200, eps: float = 1e-9) -> float:
"""Continued fraction representation of Q(a, x)."""
lg = math.lgamma(a)
fpmin = 1e-30
b = x + 1.0 - a
c = 1.0 / fpmin
d = 1.0 / b
h = d
for i in range(1, max_iter + 1):
an = -i * (i - a)
b += 2.0
d = an * d + b
if abs(d) < fpmin:
d = fpmin
c = b + an / c
if abs(c) < fpmin:
c = fpmin
d = 1.0 / d
delta = d * c
h *= delta
if abs(delta - 1.0) < eps:
break
return math.exp(-x + a * math.log(x) - lg) * h
# ---------------------------------------------------------------------------
# Pedigree base F values
# ---------------------------------------------------------------------------
PEDIGREE_BASE_F = {
"self": 0.5,
"full-sib": 0.25,
"half-sib": 0.125,
"first-cousin": 0.0625,
"double-first-cousin": 0.125,
"uncle-niece": 0.125,
"aunt-nephew": 0.125,
"half-first-cousin": 0.03125,
"second-cousin": 0.015625,
}
[文档]
@register_tool("PopGenTool")
class PopGenTool(BaseTool):
"""Population genetics calculator: HWE, Fst, inbreeding, haplotypes."""
[文档]
def __init__(self, tool_config: Dict[str, Any]):
super().__init__(tool_config)
self.parameter = tool_config.get("parameter", {})
self.required = self.parameter.get("required", [])
[文档]
def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
operation = arguments.get("operation")
if not operation:
return {"status": "error", "error": "Missing required parameter: operation"}
handlers = {
"hwe_test": self._hwe_test,
"fst": self._fst,
"inbreeding": self._inbreeding,
"haplotype_count": self._haplotype_count,
}
handler = handlers.get(operation)
if not handler:
return {
"status": "error",
"error": f"Unknown operation: {operation}",
"available_operations": list(handlers.keys()),
}
try:
return handler(arguments)
except Exception as e:
return {"status": "error", "error": f"Calculation failed: {str(e)}"}
[文档]
def _hwe_test(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
obs_AA = arguments.get("obs_AA", 0)
obs_Aa = arguments.get("obs_Aa", 0)
obs_aa = arguments.get("obs_aa", 0)
n = obs_AA + obs_Aa + obs_aa
if n == 0:
return {"status": "error", "error": "Total genotype count is zero."}
p = (2 * obs_AA + obs_Aa) / (2 * n)
q = 1.0 - p
exp_AA = n * p**2
exp_Aa = n * 2 * p * q
exp_aa = n * q**2
chi2 = 0.0
for obs, exp in [(obs_AA, exp_AA), (obs_Aa, exp_Aa), (obs_aa, exp_aa)]:
if exp > 0:
chi2 += (obs - exp) ** 2 / exp
p_value = _chi2_pvalue(chi2, df=1)
if p_value > 0.05:
interp = "Locus is in Hardy-Weinberg equilibrium (p > 0.05)."
else:
direction = "excess" if obs_Aa > exp_Aa else "deficit"
interp = (
f"Significant deviation from HWE (chi2={chi2:.2f}, p={p_value:.4f}). "
f"Heterozygote {direction} observed."
)
return {
"status": "success",
"data": {
"obs_AA": obs_AA,
"obs_Aa": obs_Aa,
"obs_aa": obs_aa,
"exp_AA": round(exp_AA, 2),
"exp_Aa": round(exp_Aa, 2),
"exp_aa": round(exp_aa, 2),
"allele_freq_A": round(p, 4),
"allele_freq_a": round(q, 4),
"chi2": round(chi2, 4),
"p_value": round(p_value, 6),
"df": 1,
"in_HWE": p_value > 0.05,
"interpretation": interp,
},
}
[文档]
def _fst(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
p1 = arguments.get("p1")
p2 = arguments.get("p2")
n1 = arguments.get("n1")
n2 = arguments.get("n2")
for name, val in [("p1", p1), ("p2", p2), ("n1", n1), ("n2", n2)]:
if val is None:
return {
"status": "error",
"error": f"Missing required parameter: {name}",
}
for val, name in [(p1, "p1"), (p2, "p2")]:
if not 0.0 <= val <= 1.0:
return {
"status": "error",
"error": f"{name}={val} is out of range [0, 1].",
}
if n1 <= 0 or n2 <= 0:
return {
"status": "error",
"error": "Sample sizes must be positive integers.",
}
p_bar = (n1 * p1 + n2 * p2) / (n1 + n2)
if p_bar == 0.0 or p_bar == 1.0:
return {
"status": "success",
"data": {
"p1": p1,
"p2": p2,
"n1": n1,
"n2": n2,
"p_bar": round(p_bar, 4),
"Fst": 0.0,
"interpretation": "Allele is fixed or absent in both populations; Fst is 0.",
},
}
msp = (n1 * (p1 - p_bar) ** 2 + n2 * (p2 - p_bar) ** 2) / 1.0
msg = (n1 * p1 * (1 - p1) + n2 * p2 * (1 - p2)) / (n1 + n2 - 2)
denom = msp + msg
fst = max(0.0, (msp - msg) / denom) if denom > 0 else 0.0
if fst < 0.05:
interp = f"Fst={fst:.4f}: Little genetic differentiation."
elif fst < 0.15:
interp = f"Fst={fst:.4f}: Moderate genetic differentiation."
elif fst < 0.25:
interp = f"Fst={fst:.4f}: Great genetic differentiation."
else:
interp = f"Fst={fst:.4f}: Very great genetic differentiation."
return {
"status": "success",
"data": {
"p1": p1,
"p2": p2,
"n1": n1,
"n2": n2,
"p_bar": round(p_bar, 4),
"Fst": round(fst, 4),
"interpretation": interp,
},
}
[文档]
def _inbreeding(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
pedigree = arguments.get("pedigree")
generations = arguments.get("generations", 1)
if not pedigree:
return {"status": "error", "error": "Missing required parameter: pedigree"}
key = pedigree.lower().strip()
if key not in PEDIGREE_BASE_F:
return {
"status": "error",
"error": f"Unknown pedigree type '{pedigree}'. Available: {', '.join(sorted(PEDIGREE_BASE_F))}",
}
if generations < 1:
return {"status": "error", "error": "generations must be >= 1."}
base_f = PEDIGREE_BASE_F[key]
f_per_gen = []
f_prev = 0.0
for _ in range(generations):
f_g = base_f + (1 - base_f) * f_prev
f_per_gen.append(round(f_g, 6))
f_prev = f_g
cumulative_f = f_per_gen[-1]
return {
"status": "success",
"data": {
"pedigree": key,
"base_F_per_mating": base_f,
"generations": generations,
"F_per_generation": f_per_gen,
"cumulative_F": round(cumulative_f, 6),
"heterozygosity_retained": round(1.0 - cumulative_f, 6),
"interpretation": (
f"After {generations} generation(s) of {key} mating, "
f"F = {cumulative_f:.4f}. "
f"{(1 - cumulative_f) * 100:.1f}% of heterozygosity retained."
),
},
}
[文档]
def _haplotype_count(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
n_snps = arguments.get("n_snps")
generations = arguments.get("generations", 1)
recomb_rate = arguments.get("recomb_rate", 0.5)
if n_snps is None or n_snps < 1:
return {"status": "error", "error": "n_snps must be >= 1."}
if generations < 0:
return {"status": "error", "error": "generations must be >= 0."}
if not 0.0 <= recomb_rate <= 1.0:
return {"status": "error", "error": "recomb_rate must be between 0 and 1."}
max_haplotypes = 2**n_snps
n_intervals = max(n_snps - 1, 0)
expected_crossovers_per_gen = n_intervals * recomb_rate
p_no_recomb_one_gen = (1.0 - recomb_rate) ** n_intervals
p_at_least_one_recomb = 1.0 - p_no_recomb_one_gen**generations
if generations == 0:
estimated_haplotypes = 2
else:
ld_decay = (1.0 - recomb_rate) ** generations
diversity_fraction = 1.0 - ld_decay
estimated_haplotypes = max(
2, int(round(2 + (max_haplotypes - 2) * diversity_fraction))
)
estimated_haplotypes = min(estimated_haplotypes, max_haplotypes)
return {
"status": "success",
"data": {
"n_snps": n_snps,
"n_intervals": n_intervals,
"recomb_rate_per_interval": recomb_rate,
"generations": generations,
"max_theoretical_haplotypes": max_haplotypes,
"expected_crossovers_per_gamete_per_gen": round(
expected_crossovers_per_gen, 4
),
"p_at_least_one_recombination": round(p_at_least_one_recomb, 6),
"estimated_distinct_haplotypes": estimated_haplotypes,
"interpretation": (
f"After {generations} generation(s) with r={recomb_rate}: "
f"~{estimated_haplotypes} of {max_haplotypes} possible haplotypes "
f"({100.0 * estimated_haplotypes / max_haplotypes:.1f}% of max diversity)."
),
},
}