Source code for missmecha.analysis

import pandas as pd 
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error, accuracy_score
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import display

[docs] def compute_missing_rate(data, print_summary=True, plot=False): """ Compute and summarize missingness statistics for each column. This function calculates the number and percentage of missing values for each column in a dataset, and optionally provides a summary table and barplot. Parameters ---------- data : pandas.DataFrame or numpy.ndarray The dataset to analyze for missingness. If ndarray, it will be converted to DataFrame. print_summary : bool, default=True If True, prints the overall missing rate and top variables by missing rate. plot : bool, default=False If True, displays a barplot of missing rates per column. Returns ------- result : dict A dictionary with: - 'report' : pandas.DataFrame with per-column missing statistics. - 'overall_missing_rate' : float, overall percentage of missing entries. Examples -------- >>> from missmecha.analysis import compute_missing_rate >>> df = pd.read_csv("data.csv") >>> stats = compute_missing_rate(df, print_summary=True, plot=True) """ if isinstance(data, np.ndarray): data = pd.DataFrame(data, columns=[f"col{i}" for i in range(data.shape[1])]) total_rows, total_cells = data.shape[0], data.size n_missing = data.isnull().sum() missing_rate_pct = (n_missing / total_rows * 100).round(2) n_unique = data.nunique(dropna=True) dtype = data.dtypes.astype(str) report = pd.DataFrame({ "n_missing": n_missing, "missing_rate (%)": missing_rate_pct, "n_unique": n_unique, "dtype": dtype, "n_total": total_rows }).sort_values("missing_rate (%)", ascending=False) report.index.name = "column" overall_rate = round((n_missing.sum() / total_cells) * 100, 2) if print_summary: print(f"Overall missing rate: {overall_rate:.2f}%") print(f"{n_missing.sum()} / {total_cells} total values are missing.\n") print("Top variables by missing rate:") display(report.head(5)) if plot: plt.figure(figsize=(8, max(4, len(report) * 0.3))) sns.barplot(x=report["missing_rate (%)"], y=report.index, palette="coolwarm") plt.xlabel("Missing Rate (%)") plt.title("Missing Rate by Column") plt.tight_layout() plt.show() return { "report": report, "overall_missing_rate": overall_rate }
from sklearn.metrics import mean_squared_error, mean_absolute_error, accuracy_score import numpy as np
[docs] def evaluate_imputation(original_df, imputed_df, mask_array, method="rmse", cat_cols=None): """ Evaluate imputation quality by comparing imputed values to ground truth. This function computes per-column and overall evaluation scores based on the positions that were originally missing. It supports mixed-type data by applying different metrics for categorical and numerical columns. Returns both original and scaled (0-1) versions of the evaluation metrics. Parameters ---------- original_df : pd.DataFrame The fully observed reference dataset (i.e., ground truth). imputed_df : pd.DataFrame The dataset after imputation has been applied. mask_array : np.ndarray or pd.DataFrame of bool Boolean array where True = originally observed, False = originally missing. Usually obtained from MissMechaGenerator.bool_mask. method : str, default="rmse" Evaluation method to use for numeric columns. One of {'rmse', 'mae', 'accuracy'}. cat_cols : list of str, optional Column names that should be treated as categorical. These will always use accuracy. - If not provided, all columns use the method specified by `method`. Returns ------- result : dict Dictionary with two sub-dictionaries: - 'original': Contains raw evaluation scores - 'column_scores': mapping from column name to evaluation score - 'overall_score': average of valid column scores (float) - 'scaled': Contains normalized scores (0-1 range) - 'column_scores': mapping from column name to scaled evaluation score - 'overall_score': average of valid scaled column scores (float) For categorical columns, the scaled score equals the original accuracy score. Raises ------ ValueError If an unsupported method or column type is used. Notes ----- - If `cat_cols` is None: all columns use the selected `method`. - If `cat_cols` is provided: - columns in `cat_cols` use accuracy - all other columns use `method`, which must be 'rmse' or 'mae' - Includes formatted print output. Examples -------- >>> from missmecha.analysis import evaluate_imputation >>> result = evaluate_imputation(X_true, X_filled, mask, method="rmse") >>> result = evaluate_imputation( ... original_df=X_true, ... imputed_df=X_filled, ... mask_array=mask, ... method="mae", ... cat_cols=["gender", "job_type"] ... ) >>> print(result["overall_score"]) 0.872 """ def safe_compare(y_true, y_pred): # 统一转为字符串,但处理数值的字符串形式(如 "5.0" -> "5") y_true_str = [str(int(x)) if str(x).replace(".", "").isdigit() else str(x) for x in y_true] y_pred_str = [str(int(x)) if str(x).replace(".", "").isdigit() else str(x) for x in y_pred] return accuracy_score(y_true_str, y_pred_str) if method not in {"rmse", "mae", "accuracy"}: raise ValueError("Method must be one of 'rmse', 'mae', or 'accuracy'.") if isinstance(mask_array, np.ndarray): mask_df = pd.DataFrame(mask_array, columns=original_df.columns, index=original_df.index) else: mask_df = mask_array.copy() cat_cols = set(cat_cols or []) column_scores = {} score_pool = [] # 初始化结果存储 results = { "original": {"column_scores": {}, "overall_score": None}, "scaled": {"column_scores": {}, "overall_score": None}, } # 计算原始误差 for col in original_df.columns: y_true = original_df.loc[~mask_df[col], col] y_pred = imputed_df.loc[~mask_df[col], col] if y_true.empty: results["original"]["column_scores"][col] = np.nan results["scaled"]["column_scores"][col] = np.nan continue if col not in (cat_cols or []): # 数值列:计算原始误差 if method == "rmse": raw_score = mean_squared_error(y_true, y_pred, squared=False) elif method == "mae": raw_score = mean_absolute_error(y_true, y_pred) # 缩放误差到 [0,1](基于列的最大可能误差) col_range = original_df[col].max() - original_df[col].min() scaled_score = raw_score / col_range if col_range > 0 else 0.0 else: # 分类列:准确率已经是 [0,1],无需缩放 raw_score = safe_compare(y_true, y_pred) scaled_score = raw_score results["original"]["column_scores"][col] = raw_score results["scaled"]["column_scores"][col] = scaled_score # 计算原始和缩放的 Overall 分数 valid_original_scores = [s for s in results["original"]["column_scores"].values() if not np.isnan(s)] valid_scaled_scores = [s for s in results["scaled"]["column_scores"].values() if not np.isnan(s)] results["original"]["overall_score"] = np.mean(valid_original_scores) if valid_original_scores else np.nan results["scaled"]["overall_score"] = np.mean(valid_scaled_scores) if valid_scaled_scores else np.nan if cat_cols: method = "AvgErr" else: method = method.upper() # Pretty print print("-" * 50) print(f"{'Column':<12}{method:>15}{'Scaled (0-1)':>15}") print("-" * 50) for col in original_df.columns: original_str = f"{results['original']['column_scores'][col]:>15.3f}" if not np.isnan(results['original']['column_scores'][col]) else f"{'N/A':>15}" scaled_str = f"{results['scaled']['column_scores'][col]:>15.3f}" if not np.isnan(results['scaled']['column_scores'][col]) else f"{'N/A':>15}" print(f"{col:<12}{original_str}{scaled_str}") print("-" * 50) print(f"{'Overall':<12}{results['original']['overall_score']:>15.3f}{results['scaled']['overall_score']:>15.3f}")
import numpy as np import pandas as pd from scipy.stats import chi2, ttest_ind from typing import Union
[docs] class MCARTest: """ A class to perform MCAR (Missing Completely At Random) tests. Supports Little's MCAR test (global test for all variables) and pairwise MCAR t-tests (for individual variables). """ def __init__(self, method: str = "little"): """ Parameters ---------- method : {'little', 'ttest'}, default='little' The MCAR testing method to use. - 'little': Use Little's MCAR test (global p-value). - 'ttest': Perform pairwise t-tests for each variable. """ if method not in ["little", "ttest"]: raise ValueError("method must be 'little' or 'ttest'") self.method = method def __call__(self, data: Union[np.ndarray, pd.DataFrame]) -> Union[float, pd.DataFrame]: """ Run the selected MCAR test on the input data. Parameters ---------- data : np.ndarray or pd.DataFrame Input dataset with missing values. Returns ------- result : float or pd.DataFrame - A p-value (float) if method='little'. - A p-value matrix (pd.DataFrame) if method='ttest'. """ if isinstance(data, np.ndarray): data = pd.DataFrame(data, columns=[f"col{i}" for i in range(data.shape[1])]) elif not isinstance(data, pd.DataFrame): raise TypeError("Input must be a pandas DataFrame or a NumPy array.") if self.method == "little": return self.little_mcar_test(data) elif self.method == "ttest": return self.mcar_t_tests(data)
[docs] @staticmethod def little_mcar_test(X: pd.DataFrame) -> float: """ Perform Little's MCAR test on a DataFrame. Parameters ---------- X : pd.DataFrame Input dataset. Returns ------- pvalue : float P-value of the test. """ dataset = X.copy() vars = dataset.columns n_var = dataset.shape[1] gmean = dataset.mean() gcov = dataset.cov() r = dataset.isnull().astype(int) mdp = np.dot(r, [2**i for i in range(n_var)]) sorted_mdp = sorted(np.unique(mdp)) mdp_codes = [sorted_mdp.index(code) for code in mdp] dataset["mdp"] = mdp_codes pj = 0 d2 = 0 for i in range(len(sorted_mdp)): subset = dataset[dataset["mdp"] == i][vars] valid_vars = subset.columns[~subset.isnull().any()] pj += len(valid_vars) means = subset[valid_vars].mean() - gmean[valid_vars] cov = gcov.loc[valid_vars, valid_vars] mj = len(subset) if cov.shape[0] == 0: continue parta = np.dot(means.T, np.linalg.solve(cov, np.eye(cov.shape[0]))) d2 += mj * np.dot(parta, means) df = pj - n_var pvalue = 1 - chi2.cdf(d2, df) MCARTest.report(pvalue, method="Little's MCAR Test") return pvalue
[docs] @staticmethod def mcar_t_tests(X: pd.DataFrame) -> pd.DataFrame: """ Perform pairwise MCAR t-tests between missing and observed groups. Parameters ---------- X : pd.DataFrame Input dataset. Returns ------- p_matrix : pd.DataFrame Matrix of p-values (var vs var). """ vars = X.columns p_matrix = pd.DataFrame(np.nan, index=vars, columns=vars) for var in vars: for tvar in vars: group1 = X.loc[X[var].isnull(), tvar].dropna() group2 = X.loc[X[var].notnull(), tvar].dropna() if len(group1) > 1 and len(group2) > 1: p = ttest_ind(group1, group2, equal_var=False).pvalue p_matrix.loc[var, tvar] = p return p_matrix
[docs] @staticmethod def report(pvalue: float, alpha: float = 0.05, method: str = "Little's MCAR Test") -> None: """ Print a summary report of the MCAR test. Parameters ---------- pvalue : float The p-value from the MCAR test. alpha : float, default=0.05 Significance level. method : str, default="Little's MCAR Test" Method name shown in report. """ print(f"Method: {method}") print(f"Test Statistic p-value: {pvalue:.6f}") if pvalue < alpha: print(f"Decision: Reject the null hypothesis (α = {alpha})") print("→ The data is unlikely to be Missing Completely At Random (MCAR).") else: print(f"Decision: Fail to reject the null hypothesis (α = {alpha})") print("→ There is insufficient evidence to reject MCAR.")