import numpy as np
import pandas as pd
from .generate.mcar import MCAR_TYPES
from .generate.mar import MAR_TYPES
from .generate.mnar import MNAR_TYPES
from .generate.marcat import MARCAT_TYPES
from .generate.mnarcat import MNARCAT_TYPES
import numpy as np
import pandas as pd
from .util import safe_init
import numpy as np
import pandas as pd
MECHANISM_LOOKUP = {
"mcar": MCAR_TYPES,
"mar": MAR_TYPES,
"mnar": MNAR_TYPES,
"marcat": MARCAT_TYPES,
"mnarcat":MNARCAT_TYPES
}
import warnings
[docs]
class MissMechaGenerator:
"""
Flexible simulator for generating missing data under various mechanisms.
This class serves as the central interface for simulating missing values using various predefined mechanisms
(e.g., MCAR, MAR, MNAR), or user-defined custom mechanisms. It supports both global and column-wise
missingness simulation, enabling fine-grained control over which features to mask and how.
Parameters
----------
mechanism : str, default="MCAR"
The default missingness mechanism to use if `info` is not specified.
Can be one of {"mcar", "mar", "mnar", "custom"}.
mechanism_type : int, default=1
The subtype of the mechanism (e.g., MAR type 1, MNAR type 4).
Ignored if `mechanism="custom"`.
missing_rate : float, default=0.2
Proportion of values to mask as missing (only used in global simulation or
if column-level info does not override).
seed : int, default=1
Random seed to ensure reproducibility.
info : dict, optional
Dictionary defining per-column missingness settings. Each key is a column
or tuple of columns, and each value is a dict with the following fields:
- 'mechanism' : str or type
One of {"mcar", "mar", "mnar", "custom"} or directly a class.
- 'type' : int (optional)
Subtype index for predefined mechanisms.
- 'custom_class' : class (optional)
A user-defined class implementing `.fit(X)` and `.transform(X)`.
Required if 'mechanism' is "custom".
- 'rate' : float
Proportion of values to mask in the column(s).
- 'depend_on' : str or list (optional)
Dependency columns for MAR or MNAR patterns.
- 'para' : dict (optional)
Additional keyword arguments passed to the mechanism constructor.
cat_cols : list of str, optional
Columns treated as categorical variables. Values will be internally encoded
into integers during simulation, then mapped back to original values.
custom_class : class, optional
A user-defined mechanism class to use in global simulation when `mechanism="custom"`.
Must implement `fit(X)` and `transform(X)` methods.
Examples
--------
>>> from missmecha.generator import MissMechaGenerator
>>> import numpy as np
>>> X = np.random.rand(100, 5)
>>> generator = MissMechaGenerator(mechanism="mcar", mechanism_type=1, missing_rate=0.2)
>>> X_missing = generator.fit_transform(X)
"""
def __init__(self, mechanism="MCAR", mechanism_type=1, missing_rate=0.2, seed=1, info=None, cat_cols=None, custom_class=None):
"""
Multiple-mechanism generator. Uses 'info' dictionary for column-wise specification.
Parameters
----------
mechanism : str
Default mechanism type (if info is not provided).
mechanism_type : int
Default mechanism subtype.
missing_rate : float
Default missing rate.
seed : int
Random seed.
info : dict
Column-specific missingness configuration.
custom_class : class
Allow user to custom mechanism class.
"""
VALID_MECHANISMS = {"mcar", "mar", "mnar", "custom"}
if not isinstance(mechanism, str):
raise TypeError(f"`mechanism` should be a string, got {type(mechanism)}.")
mechanism_lower = mechanism.lower()
if mechanism_lower not in VALID_MECHANISMS:
raise ValueError(f"`mechanism` should be one of {VALID_MECHANISMS}, got '{mechanism}'.")
self.mechanism = mechanism_lower
# --- mechanism_type ---
if self.mechanism != "custom":
if not isinstance(mechanism_type, int) or mechanism_type < 0:
raise ValueError(f"`mechanism_type` must be a non-negative integer for mechanism='{self.mechanism}'. Got: {mechanism_type}")
self.mechanism_type = mechanism_type
else:
self.mechanism_type = None # Not used
# --- missing_rate ---
if not isinstance(missing_rate, (int, float)) or not (0.0 <= missing_rate <= 1.0):
raise ValueError(f"`missing_rate` must be a float in [0, 1]. Got: {missing_rate}")
self.missing_rate = missing_rate
# --- custom_class ---
if self.mechanism == "custom":
if custom_class is None:
raise ValueError("When `mechanism='custom'`, you must provide `custom_class`.")
if not callable(custom_class):
raise TypeError(f"`custom_class` must be a class or callable. Got {type(custom_class)}.")
self.custom_class = custom_class
self.seed = seed
self.info = info
#self.info = self._expand_info(info) if info is not None else None
self._fitted = False
self.label = None
self.generator_map = {}
self.cat_cols = cat_cols
self.cat_maps = {}
self.generator_class = None
self.col_names = None
self.is_df = None
self.index = None
self.mask = None # Binary mask: 1 = observed, 0 = missing
self.bool_mask = None # Boolean mask: True = observed, False = missing
if not info:
# fallback to default generator for entire dataset
if self.mechanism != "custom":
self.generator_class = MECHANISM_LOOKUP[self.mechanism][self.mechanism_type]
else:
self.generator_class = self.custom_class
warnings.filterwarnings("ignore")
def _resolve_columns(self, cols):
"""
Resolve column names and indices based on input type.
Parameters
----------
cols : list, tuple, or range
Column specification in either str or int format.
Returns
-------
col_labels : list of str
Column names.
col_idxs : list of int
Corresponding index positions.
"""
if self.is_df:
col_labels = list(cols)
col_idxs = [self.col_names.index(c) for c in cols]
else:
if all(isinstance(c, int) for c in cols):
col_labels = [f"col{c}" for c in cols]
col_idxs = list(cols)
elif all(isinstance(c, str) and c.startswith("col") for c in cols):
col_idxs = [int(c.replace("col", "")) for c in cols]
col_labels = list(cols)
else:
raise ValueError(f"Invalid column specification: {cols} for ndarray input")
return col_labels, col_idxs
[docs]
def fit(self, X, y=None):
"""
Fit the internal generators to the input dataset.
This step prepares the missingness generators based on either global
or column-specific configurations.
Parameters
----------
X : pd.DataFrame or np.ndarray
The complete input dataset.
y : array-like, optional
Label or target data (used for some MNAR or MAR configurations).
Returns
-------
self : MissMechaGenerator
Returns the fitted generator instance.
"""
self.label = y
self.is_df = isinstance(X, pd.DataFrame)
self.col_names = X.columns.tolist() if self.is_df else [f"col{i}" for i in range(X.shape[1])]
self.index = X.index if self.is_df else np.arange(X.shape[0])
self.generator_map = {}
# Handle categorical mapping
if self.cat_cols:
if not self.is_df:
raise ValueError("Categorical handling requires DataFrame input.")
self.cat_maps = {} # {col_name: {int: str}}
for col in self.cat_cols:
unique_values = X[col].dropna().unique()
value_to_int = {v: i for i, v in enumerate(sorted(unique_values))}
int_to_value = {i: v for v, i in value_to_int.items()}
self.cat_maps[col] = int_to_value # Save inverse mapping
# Replace original categorical values with numerical codes
X[col] = X[col].map(value_to_int).astype(float)
# Fallback: global generator
if self.info is None:
if self.mechanism == "custom":
if self.custom_class is None:
raise ValueError("When mechanism='custom', you must provide `custom_class`.")
self.generator_class = self.custom_class
generator = self.generator_class(missing_rate=self.missing_rate, seed=self.seed)
X_np = X.to_numpy() if self.is_df else X
generator.fit(X_np, y=self.label)
self.generator_map["global"] = generator
# Column-wise generator using info
else:
for key, settings in self.info.items():
cols = (key,) if isinstance(key, (str, int)) else key
col_labels, col_idxs = self._resolve_columns(cols)
mechanism = settings["mechanism"].lower()
custom_cls = settings.get("custom_class", None)
rate = settings["rate"]
depend_on = settings.get("depend_on", None)
para = settings.get("para", {})
if not isinstance(para, dict):
para = {"value": para}
col_seed = self.seed + hash(str(key)) % 10000 if self.seed is not None else None
init_kwargs = {
"missing_rate": rate,
"seed": col_seed,
"depend_on": depend_on,
**para
}
init_kwargs = {k: v for k, v in init_kwargs.items() if v is not None}
label = settings.get("label", y)
if isinstance(mechanism, str) and mechanism.lower() == "custom":
if custom_cls is None:
raise ValueError(f"Column {key} specified custom mechanism but no `custom_class` provided.")
generator_cls = custom_cls
else:
mech_type = settings["type"]
generator_cls = MECHANISM_LOOKUP[mechanism][mech_type]
generator = safe_init(generator_cls, init_kwargs)
sub_X = X[list(col_labels)].to_numpy() if self.is_df else X[:, col_idxs]
generator.fit(sub_X, y=label)
self.generator_map[key] = generator
self._fitted = True
return self
def _expand_info(self, info):
"""
Expand group-style `info` dict into one-entry-per-column format.
Parameters
----------
info : dict
Original `info` mapping, possibly with multiple-column keys.
Returns
-------
new_info : dict
Expanded column-specific `info` dictionary.
"""
new_info = {}
for key, settings in info.items():
if isinstance(key, (list, tuple, range)):
for col in key:
new_info[col] = settings.copy() # 每列一个 copy,避免共享引用
else:
new_info[key] = settings
return new_info
[docs]
def get_mask(self):
"""
Return the latest binary mask generated by `transform()`.
Returns
-------
mask : np.ndarray
Binary array where 1 = observed, 0 = missing.
"""
if self.mask is None:
raise RuntimeError("Mask not available. Please call `transform()` first.")
return self.mask
[docs]
def get_bool_mask(self):
"""
Return the latest boolean mask generated by `transform()`.
Returns
-------
bool_mask : np.ndarray
Boolean array where True = observed, False = missing.
"""
if self.bool_mask is None:
raise RuntimeError("Boolean mask not available. Please call `transform()` first.")
return self.bool_mask