Source code for tooluniverse.survival_tool
"""
Survival Analysis Tool
Implements Kaplan-Meier estimator, log-rank test, and Cox proportional hazards
regression for survival data analysis.
No external API calls. Uses numpy and scipy for computation.
References: Kaplan & Meier (1958), Mantel (1966), Cox (1972).
"""
from typing import Dict, Any, List
from .base_tool import BaseTool
from .tool_registry import register_tool
try:
import numpy as np
HAS_NUMPY = True
except ImportError:
HAS_NUMPY = False
try:
from scipy import stats
HAS_SCIPY = True
except ImportError:
HAS_SCIPY = False
[docs]
@register_tool("SurvivalTool")
class SurvivalTool(BaseTool):
"""
Local survival analysis tools.
Implements:
- Kaplan-Meier survival estimator
- Log-rank test (Mantel-Cox)
- Cox proportional hazards regression (basic)
No external API required. Uses numpy/scipy.
"""
[docs]
def __init__(self, tool_config: Dict[str, Any]):
super().__init__(tool_config)
self.parameter = tool_config.get("parameter", {})
self.required = self.parameter.get("required", [])
[docs]
def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute survival analysis."""
if not HAS_NUMPY:
return {
"status": "error",
"error": "numpy is required. Install with: pip install numpy scipy",
}
operation = arguments.get("operation")
if not operation:
return {"status": "error", "error": "Missing required parameter: operation"}
operation_handlers = {
"kaplan_meier": self._kaplan_meier,
"log_rank_test": self._log_rank_test,
"cox_regression": self._cox_regression,
}
handler = operation_handlers.get(operation)
if not handler:
return {
"status": "error",
"error": f"Unknown operation: {operation}",
"available_operations": list(operation_handlers.keys()),
}
try:
return handler(arguments)
except Exception as e:
return {"status": "error", "error": f"Analysis failed: {str(e)}"}
[docs]
def _km_estimator(self, durations: np.ndarray, events: np.ndarray):
"""Compute Kaplan-Meier product-limit survival estimate.
Returns (times, survival, at_risk, events_at_t, n_censored).
"""
event_times = np.sort(np.unique(durations[events == 1]))
km_times = [0.0]
km_survival = [1.0]
km_at_risk: List[int] = []
km_events: List[int] = []
km_censored: List[int] = []
s = 1.0
for t_i in event_times:
n_risk = int(np.sum(durations >= t_i))
d_i = int(np.sum((durations == t_i) & (events == 1)))
c_i = int(np.sum((durations == t_i) & (events == 0)))
# Product-limit update: S(t) *= (1 - d_i / n_risk)
s *= 1.0 - d_i / n_risk
km_times.append(float(t_i))
km_survival.append(round(s, 4))
km_at_risk.append(n_risk)
km_events.append(d_i)
km_censored.append(c_i)
return km_times, km_survival, km_at_risk, km_events, km_censored
[docs]
def _kaplan_meier(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""
Compute Kaplan-Meier survival estimate.
durations: list of observed times
event_observed: list of 1 (event occurred) or 0 (censored)
group_labels: optional list of group labels for stratified analysis
"""
durations = arguments.get("durations", [])
event_observed = arguments.get("event_observed", [])
if not durations or not event_observed:
return {
"status": "error",
"error": "durations and event_observed are required",
}
if len(durations) != len(event_observed):
return {
"status": "error",
"error": "durations and event_observed must have the same length",
}
group_labels = arguments.get("group_labels")
try:
dur = np.array([float(x) for x in durations])
evs = np.array([int(x) for x in event_observed])
except (ValueError, TypeError) as e:
return {"status": "error", "error": f"Invalid values: {e}"}
if np.any(dur < 0):
return {"status": "error", "error": "All durations must be non-negative"}
if group_labels is None:
# Single group
km_times, km_survival, km_at_risk, km_events, km_censored = (
self._km_estimator(dur, evs)
)
km_surv_arr = np.array(km_survival)
below = np.where(km_surv_arr <= 0.5)[0]
median_survival = float(km_times[below[0]]) if len(below) > 0 else None
return {
"status": "success",
"data": {
"method": "Kaplan-Meier",
"n_subjects": len(dur),
"n_events": int(np.sum(evs)),
"n_censored": int(np.sum(1 - evs)),
"median_survival_time": median_survival,
"survival_table": {
"times": km_times,
"survival_probability": km_survival,
"at_risk": km_at_risk,
"events": km_events,
"censored": km_censored,
},
"follow_up_time": float(np.max(dur)),
},
}
else:
# Stratified analysis
if len(group_labels) != len(dur):
return {
"status": "error",
"error": "group_labels must have the same length as durations",
}
groups = {}
labels_arr = np.array(group_labels)
for label in np.unique(labels_arr):
mask = labels_arr == label
g_dur = dur[mask]
g_evs = evs[mask]
km_times, km_survival, km_at_risk, km_events, km_censored = (
self._km_estimator(g_dur, g_evs)
)
km_surv_arr = np.array(km_survival)
below = np.where(km_surv_arr <= 0.5)[0]
median_survival = float(km_times[below[0]]) if len(below) > 0 else None
groups[str(label)] = {
"n_subjects": int(np.sum(mask)),
"n_events": int(np.sum(g_evs)),
"median_survival_time": median_survival,
"survival_table": {
"times": km_times,
"survival_probability": km_survival,
},
}
return {
"status": "success",
"data": {
"method": "Kaplan-Meier (stratified)",
"n_groups": len(groups),
"groups": groups,
},
}
[docs]
def _log_rank_test(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""
Perform Mantel-Cox log-rank test between two groups.
durations_a, events_a: Group A survival data
durations_b, events_b: Group B survival data
"""
if not HAS_SCIPY:
return {"status": "error", "error": "scipy is required for log-rank test"}
for field in ("durations_a", "events_a", "durations_b", "events_b"):
if not arguments.get(field):
return {"status": "error", "error": f"{field} is required"}
try:
da = np.array([float(x) for x in arguments["durations_a"]])
ea = np.array([int(x) for x in arguments["events_a"]])
db = np.array([float(x) for x in arguments["durations_b"]])
eb = np.array([int(x) for x in arguments["events_b"]])
except (ValueError, TypeError) as e:
return {"status": "error", "error": f"Invalid values: {e}"}
if len(da) != len(ea) or len(db) != len(eb):
return {
"status": "error",
"error": "durations and events must have matching lengths within each group",
}
all_times = np.unique(np.concatenate([da[ea == 1], db[eb == 1]]))
O1_total = O2_total = E1_total = E2_total = 0.0
var_total = 0.0
for t in all_times:
n1 = np.sum(da >= t)
n2 = np.sum(db >= t)
o1 = np.sum((da == t) & (ea == 1))
o2 = np.sum((db == t) & (eb == 1))
n = n1 + n2
o = o1 + o2
if n < 2:
continue
e1 = n1 * o / n
e2 = n2 * o / n
O1_total += o1
E1_total += e1
O2_total += o2
E2_total += e2
# Hypergeometric variance
if n > 1 and o > 0:
var_total += (n1 * n2 * o * (n - o)) / (n**2 * (n - 1))
if var_total <= 0:
return {
"status": "error",
"error": "Cannot compute log-rank test (no variance in event times)",
}
chi2_stat = (O1_total - E1_total) ** 2 / var_total
p_value = 1 - stats.chi2.cdf(chi2_stat, df=1)
return {
"status": "success",
"data": {
"method": "Log-Rank Test (Mantel-Cox)",
"group_a": {
"n_subjects": len(da),
"n_events": int(np.sum(ea)),
"observed": round(float(O1_total), 2),
"expected": round(float(E1_total), 2),
},
"group_b": {
"n_subjects": len(db),
"n_events": int(np.sum(eb)),
"observed": round(float(O2_total), 2),
"expected": round(float(E2_total), 2),
},
"chi2_statistic": round(float(chi2_stat), 4),
"p_value": round(float(p_value), 6),
"degrees_of_freedom": 1,
"interpretation": (
"Statistically significant difference in survival (p < 0.05)"
if p_value < 0.05
else "No statistically significant difference in survival (p >= 0.05)"
),
},
}
[docs]
def _cox_regression(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""
Fit Cox proportional hazards model.
durations: list of observed times
event_observed: list of 0/1 event indicators
covariates: dict of {covariate_name: [values]}
"""
if not HAS_SCIPY:
return {"status": "error", "error": "scipy is required for Cox regression"}
durations = arguments.get("durations", [])
event_observed = arguments.get("event_observed", [])
covariates = arguments.get("covariates", {})
if not durations or not event_observed:
return {
"status": "error",
"error": "durations and event_observed are required",
}
if not covariates:
return {
"status": "error",
"error": "covariates dict is required (at least one covariate)",
}
try:
dur = np.array([float(x) for x in durations])
evs = np.array([int(x) for x in event_observed])
except (ValueError, TypeError) as e:
return {"status": "error", "error": f"Invalid duration/event values: {e}"}
n = len(dur)
cov_names = list(covariates.keys())
cov_matrix = []
for name in cov_names:
vals = covariates[name]
if len(vals) != n:
return {
"status": "error",
"error": f"Covariate '{name}' has {len(vals)} values but expected {n}",
}
try:
cov_matrix.append([float(v) for v in vals])
except (ValueError, TypeError) as e:
return {
"status": "error",
"error": f"Invalid values in covariate '{name}': {e}",
}
X = np.array(cov_matrix).T # shape (n, p)
X_mean = X.mean(axis=0)
X_std = X.std(axis=0)
X_std[X_std == 0] = 1
X_std_norm = (X - X_mean) / X_std
# Newton-Raphson optimization for partial likelihood
beta = np.zeros(X_std_norm.shape[1])
def partial_log_likelihood(b):
"""Cox partial log-likelihood."""
eta = X_std_norm @ b
log_lik = 0.0
for i in range(n):
if evs[i] == 1:
at_risk = dur >= dur[i]
log_lik += eta[i] - np.log(np.sum(np.exp(eta[at_risk])))
return -log_lik # minimize negative log-likelihood
from scipy.optimize import minimize
result = minimize(
partial_log_likelihood,
beta,
method="L-BFGS-B",
options={"maxiter": 1000},
)
if not result.success:
return {
"status": "error",
"error": "Cox regression optimization did not converge. Try with fewer covariates or more data.",
}
beta_fitted = result.x
beta_original = beta_fitted / X_std
# Approximate standard errors via Hessian (numerical)
try:
from scipy.optimize import approx_fprime
hess_approx = np.zeros((len(beta_fitted), len(beta_fitted)))
eps = 1e-5
for i in range(len(beta_fitted)):
def grad_i(b):
grad = approx_fprime(b, partial_log_likelihood, eps)
return grad[i]
hess_approx[i] = approx_fprime(beta_fitted, grad_i, eps)
cov_matrix_beta = np.linalg.pinv(hess_approx)
se = np.sqrt(np.abs(np.diag(cov_matrix_beta)))
except Exception:
se = np.full_like(beta_fitted, np.nan)
z_scores = beta_original / (se / X_std)
p_values = 2 * (1 - stats.norm.cdf(np.abs(z_scores)))
hazard_ratios = np.exp(beta_original)
coef_results = []
for i, name in enumerate(cov_names):
coef_results.append(
{
"covariate": name,
"coefficient": round(float(beta_original[i]), 4),
"hazard_ratio": round(float(hazard_ratios[i]), 4),
"hazard_ratio_95ci": (
round(
float(np.exp(beta_original[i] - 1.96 * se[i] / X_std[i])), 4
),
round(
float(np.exp(beta_original[i] + 1.96 * se[i] / X_std[i])), 4
),
),
"p_value": round(float(p_values[i]), 4),
"significant": bool(p_values[i] < 0.05),
}
)
return {
"status": "success",
"data": {
"method": "Cox Proportional Hazards",
"n_subjects": n,
"n_events": int(np.sum(evs)),
"coefficients": coef_results,
"log_likelihood": round(float(-result.fun), 4),
"convergence": result.success,
"note": "HR > 1 indicates increased hazard (worse survival); HR < 1 indicates decreased hazard (better survival).",
},
}