Source code for tooluniverse.survival_tool

"""
Survival Analysis Tool

Implements Kaplan-Meier estimator, log-rank test, and Cox proportional hazards
regression for survival data analysis.

No external API calls. Uses numpy and scipy for computation.
References: Kaplan & Meier (1958), Mantel (1966), Cox (1972).
"""

import math
from typing import Dict, Any, List
from .base_tool import BaseTool
from .tool_registry import register_tool

try:
    import numpy as np

    HAS_NUMPY = True
except ImportError:
    HAS_NUMPY = False

try:
    from scipy import stats

    HAS_SCIPY = True
except ImportError:
    HAS_SCIPY = False


[docs] @register_tool("SurvivalTool") class SurvivalTool(BaseTool): """ Local survival analysis tools. Implements: - Kaplan-Meier survival estimator - Log-rank test (Mantel-Cox) - Cox proportional hazards regression (basic) No external API required. Uses numpy/scipy. """
[docs] def __init__(self, tool_config: Dict[str, Any]): super().__init__(tool_config) self.parameter = tool_config.get("parameter", {}) self.required = self.parameter.get("required", [])
[docs] def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Execute survival analysis.""" if not HAS_NUMPY: return { "status": "error", "error": "numpy is required. Install with: pip install numpy scipy", } operation = arguments.get("operation") if not operation: return {"status": "error", "error": "Missing required parameter: operation"} operation_handlers = { "kaplan_meier": self._kaplan_meier, "log_rank_test": self._log_rank_test, "cox_regression": self._cox_regression, } handler = operation_handlers.get(operation) if not handler: return { "status": "error", "error": f"Unknown operation: {operation}", "available_operations": list(operation_handlers.keys()), } try: return handler(arguments) except Exception as e: return {"status": "error", "error": f"Analysis failed: {str(e)}"}
[docs] def _km_estimator(self, durations: np.ndarray, events: np.ndarray): """Compute Kaplan-Meier product-limit survival estimate with Greenwood 95% CI. Returns (times, survival, at_risk, events_at_t, n_censored, ci_lower, ci_upper). All returned lists have the same length (N+1), with index 0 representing the baseline (t=0, S=1, all subjects at risk, zero events/censored). 95% confidence intervals use the log-log (complementary log-log) transformation, the default in R survfit() and recommended by Collett (2015): Greenwood sum G = sum_j( d_j / (n_j * (n_j - d_j)) ) U = log(-log(S(t))) (complementary log-log) SE_U = sqrt(G) / |log(S(t))| (delta method) CI = [S(t)^exp(+1.96*SE_U), S(t)^exp(-1.96*SE_U)] Result is naturally in (0, 1) without clamping. Edge cases: S=1 → CI=[1,1]; S=0 → CI=[0,0]; |log(S)| < 1e-10 → plain linear fallback to avoid numerical blow-up. References: Collett (2015) Modelling Survival Data in Medical Research, §2.3; Kalbfleisch & Prentice (2002) §1.3. """ event_times = np.sort(np.unique(durations[events == 1])) # Include a baseline row (t=0, S=1.0) only when no event occurs at t=0. # If the first event time is 0, adding a separate t=0 baseline row would # produce duplicate timestamps [0.0, 0.0, ...], which is non-standard and # breaks downstream code that assumes unique time points (plots, indexers). # Standard KM implementations (R survfit, lifelines) use a single row per # unique event time with no duplicate at t=0. n_total = int(len(durations)) has_t0_event = len(event_times) > 0 and float(event_times[0]) == 0.0 if has_t0_event: km_times: List[float] = [] km_survival: List[float] = [] km_survival_raw: List[ float ] = [] # unrounded, for accurate median detection km_at_risk: List[int] = [] km_events: List[int] = [] km_censored: List[int] = [] km_ci_lower: List[float] = [] km_ci_upper: List[float] = [] else: km_times = [0.0] km_survival = [1.0] km_survival_raw = [1.0] km_at_risk = [n_total] km_events = [0] km_censored = [0] km_ci_lower = [1.0] km_ci_upper = [1.0] s = 1.0 greenwood_sum = 0.0 # cumulative: Σ dⱼ / (nⱼ × (nⱼ − dⱼ)) for t_i in event_times: n_risk = int(np.sum(durations >= t_i)) d_i = int(np.sum((durations == t_i) & (events == 1))) c_i = int(np.sum((durations == t_i) & (events == 0))) # Product-limit update: S(t) *= (1 - d_i / n_risk) s *= 1.0 - d_i / n_risk # Greenwood variance (Greenwood 1926): accumulate only when n_risk > d_i # (if n_risk == d_i, S(t) = 0 and the CI collapses to [0, 0]) if n_risk > d_i and s > 0: greenwood_sum += d_i / (n_risk * (n_risk - d_i)) log_s = np.log(s) # always ≤ 0 since 0 < s ≤ 1 if abs(log_s) < 1e-10: # S very close to 1: SE_U = sqrt(G)/|log(S)| is numerically unstable; # fall back to plain linear Greenwood CI se = s * np.sqrt(greenwood_sum) ci_lower = float(max(0.0, s - 1.96 * se)) ci_upper = float(min(1.0, s + 1.96 * se)) else: # Log-log (complementary log-log) CI — Collett (2015), R survfit default # U = log(-log(S)), SE_U = sqrt(G)/|log(S)|, CI = S^exp(±1.96*SE_U) # because log(-log(·)) is a decreasing function, +z gives the lower bound se_u = np.sqrt(greenwood_sum) / abs(log_s) ci_lower = float(s ** np.exp(+1.96 * se_u)) ci_upper = float(s ** np.exp(-1.96 * se_u)) else: ci_lower = 0.0 ci_upper = 0.0 km_times.append(float(t_i)) km_survival.append(round(s, 4)) km_survival_raw.append(s) # raw unrounded value used for median detection km_at_risk.append(n_risk) km_events.append(d_i) km_censored.append(c_i) km_ci_lower.append(round(ci_lower, 4)) km_ci_upper.append(round(ci_upper, 4)) return ( km_times, km_survival, km_survival_raw, km_at_risk, km_events, km_censored, km_ci_lower, km_ci_upper, )
[docs] def _kaplan_meier(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """ Compute Kaplan-Meier survival estimate. durations: list of observed times event_observed: list of 1 (event occurred) or 0 (censored) group_labels: optional list of group labels for stratified analysis """ durations = arguments.get("durations", []) event_observed = arguments.get("event_observed", []) if not durations or not event_observed: return { "status": "error", "error": "durations and event_observed are required", } if len(durations) != len(event_observed): return { "status": "error", "error": "durations and event_observed must have the same length", } group_labels = arguments.get("group_labels") try: dur = np.array([float(x) for x in durations]) evs = np.array([int(x) for x in event_observed]) except (ValueError, TypeError) as e: return {"status": "error", "error": f"Invalid values: {e}"} # NaN comparison always returns False: np.any(NaN < 0) == False, so NaN # bypasses the negative-duration guard and corrupts all downstream # at-risk counts and survival probabilities. Check isfinite first. if np.any(~np.isfinite(dur)): return { "status": "error", "error": ( "durations contain non-finite values (NaN or inf). " "All survival times must be finite real numbers. " "Remove or correct the affected observations." ), } if np.any(dur < 0): return {"status": "error", "error": "All durations must be non-negative"} if not np.all(np.isin(evs, [0, 1])): return { "status": "error", "error": "event_observed values must be 0 (censored) or 1 (event occurred)", } if group_labels is None: # Single group ( km_times, km_survival, km_survival_raw, km_at_risk, km_events, km_censored, km_ci_lower, km_ci_upper, ) = self._km_estimator(dur, evs) # Use raw (unrounded) survival probabilities for the # median check. Using the rounded display values causes false median # detection when the true S is just above 0.5 but rounds down to 0.5 # (e.g., S=0.50005 → round(0.50005, 4)=0.5 ≤ 0.5 triggers false median). km_surv_arr = np.array(km_survival_raw) below = np.where(km_surv_arr <= 0.5)[0] median_survival = float(km_times[below[0]]) if len(below) > 0 else None n_events_total = int(np.sum(evs == 1)) n_censored_total = int(np.sum(evs == 0)) result_data = { "method": "Kaplan-Meier", "n_subjects": len(dur), "n_events": n_events_total, "n_censored": n_censored_total, "median_survival_time": median_survival, "survival_table": { "times": km_times, "survival_probability": km_survival, "ci_lower_95": km_ci_lower, "ci_upper_95": km_ci_upper, "at_risk": km_at_risk, "events": km_events, "censored": km_censored, }, "ci_method": "Greenwood log-log (Collett 2015 / R survfit default)", "follow_up_time": float(np.max(dur)), } # The note explaining the KM 'censored' column convention was # only emitted in the all-censored case. In the typical mixed case (events # + subjects censored between event times), sum(table['censored']) = 0 while # n_censored > 0 — a silent discrepancy with no explanation. Emit the note # whenever any censored subjects exist. if n_censored_total > 0: _km_cens_note = ( "The survival table 'censored' column records only subjects " "censored AT observed event times (standard Kaplan-Meier " "convention); subjects censored between event times do not appear " "in this column. sum(table['censored']) may be less than n_censored. " "Use n_censored for the total censored count." ) if n_events_total == 0: _km_cens_note = ( f"No events were observed (all {n_censored_total} subjects " "censored). The survival function S(t) = 1.0 throughout " "follow-up. " + _km_cens_note ) result_data["km_censored_convention_note"] = _km_cens_note return {"status": "success", "data": result_data} else: # Stratified analysis if len(group_labels) != len(dur): return { "status": "error", "error": "group_labels must have the same length as durations", } groups = {} labels_arr = np.array(group_labels) for label in np.unique(labels_arr): mask = labels_arr == label g_dur = dur[mask] g_evs = evs[mask] ( km_times, km_survival, km_survival_raw, km_at_risk, km_events, km_censored, km_ci_lower, km_ci_upper, ) = self._km_estimator(g_dur, g_evs) # Use raw survival for median (same as single-group). km_surv_arr = np.array(km_survival_raw) below = np.where(km_surv_arr <= 0.5)[0] median_survival = float(km_times[below[0]]) if len(below) > 0 else None groups[str(label)] = { "n_subjects": int(np.sum(mask)), "n_events": int(np.sum(g_evs)), "n_censored": int(np.sum(mask)) - int(np.sum(g_evs)), "median_survival_time": median_survival, "survival_table": { "times": km_times, "survival_probability": km_survival, "ci_lower_95": km_ci_lower, "ci_upper_95": km_ci_upper, "at_risk": km_at_risk, "events": km_events, "censored": km_censored, }, } # Emit the same km_censored_convention_note as the single-group # path when any group has censored subjects. The survival table 'censored' # column records only subjects censored AT observed event times; subjects # censored between event times contribute 0, creating a silent discrepancy # between n_censored and sum(table['censored']). _strat_result: dict = { "method": "Kaplan-Meier (stratified)", "ci_method": "Greenwood log-log (Collett 2015 / R survfit default)", "n_groups": len(groups), "groups": groups, } _total_censored = sum(g["n_censored"] for g in groups.values()) if _total_censored > 0: _strat_result["km_censored_convention_note"] = ( "The survival table 'censored' column records only subjects censored " "AT observed event times. Subjects censored between event times " "contribute 0 to that column. sum(table['censored']) may be less " "than n_censored. Use n_censored for the total censored count." ) return {"status": "success", "data": _strat_result}
[docs] def _log_rank_test(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """ Perform Mantel-Cox log-rank test between two groups. durations_a, events_a: Group A survival data durations_b, events_b: Group B survival data """ if not HAS_SCIPY: return {"status": "error", "error": "scipy is required for log-rank test"} for field in ("durations_a", "events_a", "durations_b", "events_b"): if not arguments.get(field): return {"status": "error", "error": f"{field} is required"} try: da = np.array([float(x) for x in arguments["durations_a"]]) ea = np.array([int(x) for x in arguments["events_a"]]) db = np.array([float(x) for x in arguments["durations_b"]]) eb = np.array([int(x) for x in arguments["events_b"]]) except (ValueError, TypeError) as e: return {"status": "error", "error": f"Invalid values: {e}"} if len(da) != len(ea) or len(db) != len(eb): return { "status": "error", "error": "durations and events must have matching lengths within each group", } # NaN comparison always returns False, so NaN bypasses all < 0 guards and # corrupts at-risk counts, observed/expected events, and the chi2 statistic. for arr, name in [ (da, "durations_a"), (db, "durations_b"), ]: if np.any(~np.isfinite(arr)): return { "status": "error", "error": ( f"{name} contains non-finite values (NaN or inf). " "All survival times must be finite real numbers." ), } # Negative survival times are not physically meaningful. for arr, name in [(da, "durations_a"), (db, "durations_b")]: if np.any(arr < 0): return { "status": "error", "error": f"All {name} must be non-negative. Negative survival times are not valid.", } for arr, name in [(ea, "events_a"), (eb, "events_b")]: if not np.all(np.isin(arr, [0, 1])): return { "status": "error", "error": f"{name} values must be 0 (censored) or 1 (event occurred)", } all_times = np.unique(np.concatenate([da[ea == 1], db[eb == 1]])) O1_total = O2_total = E1_total = E2_total = 0.0 var_total = 0.0 for t in all_times: n1 = np.sum(da >= t) n2 = np.sum(db >= t) o1 = np.sum((da == t) & (ea == 1)) o2 = np.sum((db == t) & (eb == 1)) n = n1 + n2 o = o1 + o2 if n < 2: continue e1 = n1 * o / n e2 = n2 * o / n O1_total += o1 E1_total += e1 O2_total += o2 E2_total += e2 # Hypergeometric variance if n > 1 and o > 0: var_total += (n1 * n2 * o * (n - o)) / (n**2 * (n - 1)) if var_total <= 0: # Zero variance occurs when all events share the same time point and the # split is proportional (O==E for every stratum). In this degenerate case # chi2 = 0 and p = 1.0 is the correct, well-defined answer: there is no # evidence of a difference between the two groups. # Use relative tolerance: floating-point sums over many events accumulate # rounding errors proportional to total_events × machine epsilon (~2.2e-16), # so an absolute threshold of 1e-10 is too tight for studies with thousands # of events. Relative tolerance 1e-8 is safe up to ~10^7 events. total_events = O1_total + O2_total rel_tol = 1e-8 * max(total_events, 1.0) if ( abs(O1_total - E1_total) <= rel_tol and abs(O2_total - E2_total) <= rel_tol ): chi2_stat = 0.0 p_value = 1.0 else: return { "status": "error", "error": "Cannot compute log-rank test: zero variance with non-zero observed-expected difference (degenerate data).", } else: chi2_stat = (O1_total - E1_total) ** 2 / var_total p_value = 1 - stats.chi2.cdf(chi2_stat, df=1) # n_events_in_statistic counts only events that contributed to the chi2. # Events at time points where n=n1+n2 < 2 are excluded by the loop guard # (hypergeometric variance is 0 when total at-risk < 2). In such cases, # n_events_in_statistic < n_events, and the distinction matters for interpretation. n_events_in_stat_a = int(round(O1_total)) n_events_in_stat_b = int(round(O2_total)) n_events_excluded = (int(np.sum(ea)) - n_events_in_stat_a) + ( int(np.sum(eb)) - n_events_in_stat_b ) result_data = { "method": "Log-Rank Test (Mantel-Cox)", "group_a": { "n_subjects": len(da), "n_events": int(np.sum(ea)), "n_events_in_statistic": n_events_in_stat_a, "observed": round(float(O1_total), 2), "expected": round(float(E1_total), 2), }, "group_b": { "n_subjects": len(db), "n_events": int(np.sum(eb)), "n_events_in_statistic": n_events_in_stat_b, "observed": round(float(O2_total), 2), "expected": round(float(E2_total), 2), }, "chi2_statistic": round(float(chi2_stat), 4), "p_value": round(float(p_value), 6), "degrees_of_freedom": 1, "interpretation": ( "Statistically significant difference in survival (p < 0.05)" if p_value < 0.05 else "No statistically significant difference in survival (p >= 0.05)" ), } if n_events_excluded > 0: result_data["events_excluded_note"] = ( f"{n_events_excluded} event(s) were excluded from the chi2 statistic " "because at the time of the event, fewer than 2 subjects were at risk " "across both groups (hypergeometric variance = 0 at those time points). " "n_events_in_statistic reflects only events that contributed to the test." ) return {"status": "success", "data": result_data}
[docs] def _cox_regression(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """ Fit Cox proportional hazards model. durations: list of observed times event_observed: list of 0/1 event indicators covariates: dict of {covariate_name: [values]} """ if not HAS_SCIPY: return {"status": "error", "error": "scipy is required for Cox regression"} durations = arguments.get("durations", []) event_observed = arguments.get("event_observed", []) covariates = arguments.get("covariates", {}) if not durations or not event_observed: return { "status": "error", "error": "durations and event_observed are required", } if not covariates: return { "status": "error", "error": "covariates dict is required (at least one covariate)", } try: dur = np.array([float(x) for x in durations]) evs = np.array([int(x) for x in event_observed]) except (ValueError, TypeError) as e: return {"status": "error", "error": f"Invalid duration/event values: {e}"} # NaN comparison always returns False, bypassing the < 0 guard below. if np.any(~np.isfinite(dur)): return { "status": "error", "error": ( "durations contain non-finite values (NaN or inf). " "All survival times must be finite real numbers. " "Remove or correct the affected observations." ), } if np.any(dur < 0): return { "status": "error", "error": "All durations must be non-negative. Negative survival times are not valid.", } if not np.all(np.isin(evs, [0, 1])): return { "status": "error", "error": "event_observed values must be 0 (censored) or 1 (event occurred)", } n = len(dur) cov_names = list(covariates.keys()) cov_matrix = [] for name in cov_names: vals = covariates[name] if len(vals) != n: return { "status": "error", "error": f"Covariate '{name}' has {len(vals)} values but expected {n}", } try: cov_vals = [float(v) for v in vals] except (ValueError, TypeError) as e: return { "status": "error", "error": f"Invalid values in covariate '{name}': {e}", } # NaN in a covariate silently propagates through matrix operations, producing # NaN gradient and Hessian entries, causing optimization failure with the # misleading message "did not converge". Detect and reject upfront. if any(not math.isfinite(v) for v in cov_vals): return { "status": "error", "error": ( f"Covariate '{name}' contains non-finite values (NaN or inf). " "All covariate values must be finite real numbers. " "Impute or remove the affected observations before fitting." ), } cov_matrix.append(cov_vals) X = np.array(cov_matrix).T # shape (n, p) n_events = int(np.sum(evs == 1)) p = len(cov_names) if n_events <= p: return { "status": "error", "error": ( f"Cox regression requires the number of events ({n_events}) to " f"exceed the number of covariates ({p}). The partial likelihood " "is not identifiable when events ≤ covariates. Reduce the number " "of covariates or collect more event data." ), } X_mean = X.mean(axis=0) X_std = X.std(axis=0) # Detect zero-variance covariates (all subjects share the same value). # Such covariates carry no information for Cox regression: the partial # likelihood is flat, the Hessian is zero, and the resulting standard # error is 0/0 = NaN. Return an informative error immediately instead # of propagating NaN (which is also invalid JSON per RFC 8259). zero_var_names = [cov_names[i] for i, s in enumerate(X_std) if s == 0] if zero_var_names: return { "status": "error", "error": ( f"Covariate(s) have zero variance (all subjects share the same value): " f"{zero_var_names}. Cox regression requires variation in each covariate " "to estimate a hazard ratio. Remove the constant covariate(s) before fitting." ), } # Detect pairwise collinearity: identical or perfectly correlated covariates # silently split their combined coefficient between the two columns, producing # halved (wrong) hazard ratios with no error or warning in the current code. for i in range(p): for j in range(i + 1, p): if np.allclose(X[:, i], X[:, j]) or np.allclose(X[:, i], -X[:, j]): return { "status": "error", "error": ( f"Covariates '{cov_names[i]}' and '{cov_names[j]}' are " "perfectly collinear (identical or perfectly anti-correlated). " "Cox regression cannot distinguish their effects — remove one " "of the duplicated covariates before fitting." ), } # Low EPV (events-per-variable) warning: standard practice requires ≥10 events # per covariate. Below this threshold, coefficients may be unstable or overfitted. low_epv_warning = None if n_events < 10 * p: low_epv_warning = ( f"Low events-per-variable (EPV = {n_events} events / {p} covariate(s) " f"= {n_events / p:.1f}). Standard practice recommends at least 10 events " "per covariate for reliable Cox regression estimates. With low EPV, " "coefficients may be unstable or overfitted. Consider reducing the " "number of covariates or collecting more event data." ) X_std[X_std == 0] = 1 X_std_norm = (X - X_mean) / X_std # Newton-Raphson optimization for partial likelihood beta = np.zeros(X_std_norm.shape[1]) def partial_log_likelihood(b): """Cox partial log-likelihood.""" eta = X_std_norm @ b log_lik = 0.0 for i in range(n): if evs[i] == 1: at_risk = dur >= dur[i] log_lik += eta[i] - np.log(np.sum(np.exp(eta[at_risk]))) return -log_lik # minimize negative log-likelihood from scipy.optimize import minimize result = minimize( partial_log_likelihood, beta, method="L-BFGS-B", options={"maxiter": 1000}, ) if not result.success: return { "status": "error", "error": "Cox regression optimization did not converge. Try with fewer covariates or more data.", } beta_fitted = result.x beta_original = beta_fitted / X_std # Approximate standard errors via Hessian (numerical) try: from scipy.optimize import approx_fprime hess_approx = np.zeros((len(beta_fitted), len(beta_fitted))) eps = 1e-5 for i in range(len(beta_fitted)): def grad_i(b): grad = approx_fprime(b, partial_log_likelihood, eps) return grad[i] hess_approx[i] = approx_fprime(beta_fitted, grad_i, eps) cov_matrix_beta = np.linalg.pinv(hess_approx) se = np.sqrt(np.abs(np.diag(cov_matrix_beta))) except Exception: se = np.full_like(beta_fitted, np.nan) # Compute z-scores; guard against division producing NaN if se==0 # (should not occur after the zero-variance check above, but be safe). with np.errstate(invalid="ignore", divide="ignore"): z_scores = beta_original / (se / X_std) p_values = 2 * (1 - stats.norm.cdf(np.abs(z_scores))) # hazard_ratios computed per-covariate below (with overflow guard) coef_results = [] for i, name in enumerate(cov_names): p_val = float(p_values[i]) # Guard against exact 0.0 p-value and round-to-zero pathologies. # p_val == 0.0 can arise from norm.cdf underflow (|z| > ~8.2) # on large datasets with a strong but finite covariate — NOT separation. # The previous code set p_val_out = None silently, with no explanation; # the user could not distinguish "insufficient data" from "p << 1e-15". # round(1e-6, 4) = 0.0 slipped through as p_value = 0.0 in # the output, violating the code's own invariant that exact 0 is not valid. p_underflow_note = None if np.isnan(p_val): p_val_out = None elif p_val == 0.0: # Exact zero from norm.cdf precision limit (|z| > ~8.2). # Not a separation artifact by itself — the covariate is extremely # significant. Report None with a note so callers understand. p_val_out = None p_underflow_note = ( "p-value underflow: the Wald test z-score exceeds IEEE-754 " "double precision (|z| > ~8.2; p < ~6e-16). The covariate is " "extremely statistically significant. Treat as p < 1e-15." ) else: _rounded = round(p_val, 4) if _rounded == 0.0: # round-to-zero: 0 < p_val < 5e-5; reporting 0.0 is misleading. p_val_out = None p_underflow_note = ( f"p-value rounds to zero at 4-decimal precision " f"(raw value: {p_val:.2e}). " "The result is highly significant. Treat as p < 0.0001." ) else: p_val_out = _rounded se_scaled = float(se[i]) / float(X_std[i]) # Guard CI computation against overflow (complete separation produces a # near-singular Hessian → huge se_scaled → exp overflows to inf). # Replace non-finite values with None and add a warning. with np.errstate(over="ignore"): raw_hr = float(np.exp(float(beta_original[i]))) raw_ci_lo = float(np.exp(float(beta_original[i]) - 1.96 * se_scaled)) raw_ci_hi = float(np.exp(float(beta_original[i]) + 1.96 * se_scaled)) separation_warning = None if ( not np.isfinite(raw_hr) or not np.isfinite(raw_ci_lo) or not np.isfinite(raw_ci_hi) ): separation_warning = ( "Complete or quasi-complete separation detected: the covariate " f"'{name}' perfectly (or nearly perfectly) predicts the outcome. " "The hazard ratio and CI cannot be estimated reliably. Consider " "Firth's penalized regression or exact logistic regression." ) elif ( np.isfinite(raw_hr) and np.isfinite(raw_ci_lo) and np.isfinite(raw_ci_hi) ): # Detect quasi-separation when CI is finite but astronomically wide # (e.g., CI lower = 3e-6, CI upper = 1.8e+122): the existing isfinite # guard does not fire, yet the estimate is meaningless. # Trigger on: HR very small/large, or CI width ratio > 1e6. ci_width_ratio = raw_ci_hi / max(raw_ci_lo, 1e-300) if raw_hr < 1e-4 or raw_hr > 1e4 or ci_width_ratio > 1e6: separation_warning = ( f"Possible quasi-complete separation: the hazard ratio " f"({raw_hr:.4g}) or its 95% CI ({raw_ci_lo:.4g}, {raw_ci_hi:.4g}) " f"is extreme (CI width ratio: {ci_width_ratio:.2g}). " "The coefficient is likely driven by complete separation in the data. " "Consider Firth's penalized regression or adding more event data." ) # Preserve full precision for HR: round(3.3e-6, 4) = 0.0, which is # factually wrong and creates a contradiction when the CI shows a # large ratio (indicating a very small HR, not zero). # Apply the same sys.float_info.min guard as ci_lo_out. # When beta is very large negative (separation), np.exp(beta) underflows # to 0.0 in IEEE-754. np.isfinite(0.0) = True, so the old guard allowed # hr_out = 0.0 while ci_lo_out = None — internally inconsistent. # sys.float_info.min ≈ 2.2e-308 is the smallest normal IEEE-754 double; # anything below that (including exact 0.0 and subnormals) is numerical noise. import sys as _sys hr_out = ( float(raw_hr) if np.isfinite(raw_hr) and raw_hr >= _sys.float_info.min else None ) # ci_lo can underflow to exactly 0.0 in IEEE 754 when # beta − 1.96*se_scaled is very large and negative (complete separation). # np.isfinite(0.0) is True, so the old guard silently returned 0.0 as a # "valid" lower bound while ci_hi was returned as None (inf overflow). # The asymmetric (0.0, None) tuple is internally inconsistent: both bounds # arise from the same numerical failure. Treat underflow-to-zero (or to # a subnormal float like 5e-324) the same way as overflow-to-inf → None. ci_lo_out = ( float(raw_ci_lo) if np.isfinite(raw_ci_lo) and raw_ci_lo >= _sys.float_info.min else None ) ci_hi_out = float(raw_ci_hi) if np.isfinite(raw_ci_hi) else None # Coefficient is rounded to 4 decimal places, but # hr_out and CI bounds were reported at full IEEE-754 precision (~16 dp). # A user who reads coefficient=0.4832 and computes exp(0.4832) gets a # value that does not match the returned hr_out (they differ by ~3e-5). # Round HR and CI to 4 dp for display consistency. The rounding is only # applied if the value is not None (separation or overflow cases already # return None and need no change). if hr_out is not None: hr_out = round(hr_out, 4) if ci_lo_out is not None: ci_lo_out = round(ci_lo_out, 4) if ci_hi_out is not None: ci_hi_out = round(ci_hi_out, 4) # When separation is detected, the Wald p-value is # computed from the same near-singular Hessian that makes the CI unreliable. # Under quasi-separation, z = beta/se → 0 (se is huge), so p → 1.0 — a # falsely reassuring "non-significant" result. Set p_value and significant # to None. Separation explanation supersedes underflow note. if separation_warning is not None: p_val_out = None p_underflow_note = None # separation is the operative explanation hr_out = None # Ensure HR is also None under separation # CI bounds are computed from the same near-singular Hessian # as HR and are equally unreliable under separation. Return None for # both so callers cannot silently use extreme CI values like (3e-197, 2e+184). ci_lo_out = None ci_hi_out = None # Determine significant field: # - underflow (p_val == 0 or rounds to 0) AND no separation → True (highly sig) # - separation → None (unknown) # - normal finite p_val_out → bool comparison # Use p_val_out (the rounded display value) for the significance # comparison. If raw p_val=0.04998, p_val_out=round(0.04998,4)=0.05. # Using raw p_val < 0.05 would give significant=True while p_value=0.05, # contradicting the universal convention that p=0.05 is not significant. if p_val_out is not None: _significant = bool(p_val_out < 0.05) elif p_underflow_note is not None: # Not separation; underflow → definitely significant _significant = True else: _significant = None entry = { "covariate": name, "coefficient": round(float(beta_original[i]), 4), "hazard_ratio": hr_out, "hazard_ratio_95ci": (ci_lo_out, ci_hi_out), "p_value": p_val_out, "significant": _significant, } if p_underflow_note: entry["p_value_note"] = p_underflow_note if separation_warning: entry["separation_warning"] = separation_warning coef_results.append(entry) result_data = { "method": "Cox Proportional Hazards", "n_subjects": n, "n_events": int(np.sum(evs == 1)), "coefficients": coef_results, "log_likelihood": round(float(-result.fun), 4), "convergence": result.success, "note": "HR > 1 indicates increased hazard (worse survival); HR < 1 indicates decreased hazard (better survival).", } if low_epv_warning: result_data["low_epv_warning"] = low_epv_warning return {"status": "success", "data": result_data}