Source code for hugiml.multiclass

# Copyright 2026 Srikumar Krishnamoorthy
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Helpers for three common HUG-IML deployment scenarios:

1. **Multiclass classification** — HUGIMLClassifierNative supports multiclass
   natively via its ``base_estimator`` (LogisticRegression with ``solver='lbfgs'``
   when n_classes > 2).  This module provides a ``MulticlassHUGReport`` that
   extracts per-class pattern importances.

2. **Imbalanced data** — wraps the classifier in a cost-sensitive or
   resampling pipeline via ``make_imbalanced_pipeline``.

3. **High-cardinality categoricals** — ``encode_high_cardinality`` replaces
   columns with many unique values with target-mean encoding or a frequency
   encoding before passing data to ``prepareXy``.
"""

from __future__ import annotations

import warnings
from typing import Any

import numpy as np
import pandas as pd

__all__ = [
    "MulticlassHUGReport",
    "make_imbalanced_pipeline",
    "encode_high_cardinality",
    "apply_encoding",
]


# ---------------------------------------------------------------------------
# 1. Multiclass support utilities
# ---------------------------------------------------------------------------


[docs] class MulticlassHUGReport: """Per-class pattern importances for a multiclass HUG-IML model. When the downstream estimator is LogisticRegression with > 2 classes, ``coef_`` has shape ``(n_classes, n_patterns)``. This class exposes per-class top patterns. Parameters ---------- clf : fitted HUGIMLClassifierNative """ def __init__(self, clf: Any) -> None: if not hasattr(clf, "patterns_"): raise RuntimeError("Classifier must be fitted.") self._clf = clf self._validate() def _validate(self) -> None: clf_step = self._clf.model_.named_steps.get("clf") if not hasattr(clf_step, "coef_"): raise AttributeError("Downstream estimator must expose coef_.") coef = clf_step.coef_ # Binary LR produces coef_.shape == (1, n_features); multiclass is (n_classes, n_features). # We require shape[0] > 1 so per-class indexing is unambiguous. if coef.ndim != 2 or coef.shape[0] < 2: raise ValueError( f"MulticlassHUGReport requires a multiclass model (coef_.shape[0] > 1); " f"got shape {coef.shape}. " "For binary classification, use clf.feature_importances() directly." ) @property def classes(self) -> np.ndarray: return self._clf.classes_
[docs] def importances_for_class(self, class_label: Any, top_n: int = 20) -> pd.DataFrame: """Return the top-N patterns for a specific class. Parameters ---------- class_label : class value in ``clf.classes_`` top_n : int Returns ------- pd.DataFrame with columns: pattern, coefficient, abs_coefficient, support """ clf = self._clf classes = clf.classes_.tolist() if class_label not in classes: raise ValueError(f"Class '{class_label}' not in {classes}.") cls_idx = classes.index(class_label) clf_step = clf.model_.named_steps["clf"] coef_row = clf_step.coef_[cls_idx] pattern_labels = clf.get_hug_features() n_train = clf.x_train_hup_.shape[0] rows = [] for i, (lbl, c) in enumerate(zip(pattern_labels, coef_row)): sup = float(clf.x_train_hup_[:, i].sum()) / n_train rows.append( { "pattern": lbl, "coefficient": round(float(c), 6), "abs_coefficient": round(abs(float(c)), 6), "support": round(sup, 4), } ) df = pd.DataFrame(rows).sort_values("abs_coefficient", ascending=False) return df.head(top_n).reset_index(drop=True)
[docs] def summary(self, top_n: int = 10) -> str: """Human-readable summary of top patterns per class.""" lines = ["MulticlassHUGReport", "=" * 50] for cls in self.classes: lines.append(f"\nClass: {cls}") lines.append("-" * 40) imp = self.importances_for_class(cls, top_n) for _, row in imp.iterrows(): lines.append( f" {row['pattern']:<40} " f"coef={row['coefficient']:>+8.4f} " f"sup={row['support']:.3f}" ) return "\n".join(lines)
# --------------------------------------------------------------------------- # 2. Imbalanced-data pipeline # ---------------------------------------------------------------------------
[docs] def make_imbalanced_pipeline( clf: Any, strategy: str = "class_weight", sampling_ratio: float = 1.0, random_state: int = 42, ): """Wrap a HUGIMLClassifierNative for use with imbalanced data. Parameters ---------- clf : HUGIMLClassifierNative (unfitted) strategy : {'class_weight', 'smote', 'random_oversample', 'random_undersample'} * ``class_weight`` — sets ``class_weight='balanced'`` on the downstream LR. Zero overhead; recommended first choice. * ``smote`` — SMOTE oversampling via ``imbalanced-learn``. * ``random_oversample`` — random oversampling via ``imbalanced-learn``. * ``random_undersample`` — random undersampling via ``imbalanced-learn``. sampling_ratio : float Target minority:majority ratio (only for imbalanced-learn strategies). random_state : int Returns ------- Fitted wrapper or HUGIMLClassifierNative (for 'class_weight') — the returned object has ``fit(X, y)``, ``predict_proba(X)``, and ``predict(X)`` methods. Notes ----- For 'class_weight': returns a copy of clf with base_estimator set to LogisticRegression(class_weight='balanced'). For SMOTE/resampling: returns an ``ImbalancedHUGPipeline`` that applies resampling to the **pattern matrix** (post-transform) inside fit(). This ensures the HUG patterns are mined on the *original* distribution (as intended) while the downstream classifier trains on the resampled binary matrix. """ import copy if strategy == "class_weight": from sklearn.linear_model import LogisticRegression new_clf = copy.deepcopy(clf) # Propagate through deepcopy preserving all params solver = "lbfgs" new_clf.base_estimator = LogisticRegression( class_weight="balanced", solver=solver, random_state=random_state, max_iter=500, ) return new_clf elif strategy in ("smote", "random_oversample", "random_undersample"): try: import imblearn # noqa: F401 except ImportError: raise ImportError( f"Strategy '{strategy}' requires imbalanced-learn. " "Install with: pip install imbalanced-learn" ) return _ImbalancedHUGPipeline( clf=copy.deepcopy(clf), strategy=strategy, sampling_ratio=sampling_ratio, random_state=random_state, ) else: raise ValueError( f"Unknown strategy '{strategy}'. " "Choose from: class_weight, smote, random_oversample, random_undersample." )
class _ImbalancedHUGPipeline: """Internal pipeline: mine patterns on raw data, resample pattern matrix.""" def __init__(self, clf: Any, strategy: str, sampling_ratio: float, random_state: int) -> None: self._clf = clf self._strategy = strategy self._ratio = sampling_ratio self._rs = random_state def _make_sampler(self): ratio = {"minority": self._ratio} if self._ratio < 1.0 else "auto" if self._strategy == "smote": from imblearn.over_sampling import SMOTE return SMOTE(sampling_strategy=ratio, random_state=self._rs) elif self._strategy == "random_oversample": from imblearn.over_sampling import RandomOverSampler return RandomOverSampler(sampling_strategy=ratio, random_state=self._rs) else: from imblearn.under_sampling import RandomUnderSampler return RandomUnderSampler(sampling_strategy=ratio, random_state=self._rs) def fit(self, X: Any, y: Any) -> _ImbalancedHUGPipeline: from scipy.sparse import csr_matrix from sklearn.linear_model import LogisticRegression from sklearn.pipeline import Pipeline y_arr = np.asarray(y, dtype=np.int64) # Step 1: mine patterns on original data self._clf.fit(X, y_arr) # Step 2: get binary pattern matrix for training data hup = self._clf.x_train_hup_.toarray() # Step 3: resample pattern matrix sampler = self._make_sampler() hup_res, y_res = sampler.fit_resample(hup, y_arr) hup_res_sparse = csr_matrix(hup_res, dtype=np.float32) self._clf.x_train_hup_ = hup_res_sparse # Step 4: refit downstream classifier on resampled matrix n_cls = len(np.unique(y_arr)) solver = "liblinear" if n_cls == 2 else "lbfgs" new_est = LogisticRegression(solver=solver, random_state=self._rs, max_iter=500) clf_step_orig = self._clf.model_.named_steps.get("clf") params = clf_step_orig.get_params() if hasattr(clf_step_orig, "get_params") else {} new_est.set_params(**{k: v for k, v in params.items() if k not in ("solver",)}) new_model = Pipeline([("clf", new_est)]) new_model.fit(hup_res_sparse, y_res) self._clf.model_ = new_model return self def predict_proba(self, X: Any) -> np.ndarray: return self._clf.predict_proba(X) def predict(self, X: Any) -> np.ndarray: return self._clf.predict(X) def __getattr__(self, item: str) -> Any: return getattr(self._clf, item) # --------------------------------------------------------------------------- # 3. High-cardinality categorical encoding # ---------------------------------------------------------------------------
[docs] def encode_high_cardinality( X: pd.DataFrame, y: pd.Series | np.ndarray | None = None, threshold: int = 20, method: str = "target_mean", min_samples_leaf: int = 5, smoothing: float = 1.0, random_state: int = 42, ) -> tuple[pd.DataFrame, dict]: """Replace high-cardinality categorical columns with numerical encodings. This should be called *before* ``prepareXy``; the returned mapping can be applied to test data via ``apply_encoding``. Parameters ---------- X : pd.DataFrame y : array-like, optional Required when ``method='target_mean'``. threshold : int Columns with more than this many unique values are considered high-cardinality. method : {'target_mean', 'frequency', 'ordinal'} * ``target_mean`` — replace each category with its mean target value (smoothed towards the global mean). Reduces categories to a single float — most informative for tree/rule-based models. * ``frequency`` — replace with the category's relative frequency. * ``ordinal`` — assign arbitrary integer codes (fast, no leakage, but loses any ordering meaning). min_samples_leaf : int Minimum observations per category before smoothing kicks in (target_mean only). smoothing : float Smoothing strength (target_mean only). random_state : int Used internally for any random operations. Returns ------- X_encoded : pd.DataFrame (copy — original is unchanged) encoding_map : dict Mapping ``{column_name: dict_or_array}`` to apply to unseen data via ``apply_encoding(X_test, encoding_map)``. Notes ----- *Data-leakage safety*: call ``encode_high_cardinality`` on the training split only. Use ``apply_encoding`` on test/validation data with the map returned from training. Never fit the encoding on combined train+test data. """ X = X.copy() encoding_map: dict = {} if y is not None: y_arr = np.asarray(y, dtype=float) global_mean = float(y_arr.mean()) else: y_arr = None global_mean = 0.0 for col in X.columns: n_unique = X[col].nunique(dropna=True) if n_unique <= threshold: continue # low cardinality — leave for prepareXy to handle dtype = X[col].dtype if not ( pd.api.types.is_object_dtype(dtype) or pd.api.types.is_string_dtype(dtype) or isinstance(dtype, pd.CategoricalDtype) ): continue # only encode categorical columns if method == "target_mean": if y_arr is None: warnings.warn( f"target_mean encoding for '{col}' requires y. " "Falling back to frequency encoding." ) method_col = "frequency" else: method_col = "target_mean" else: method_col = method if method_col == "target_mean": enc = _target_mean_encode(X[col], y_arr, global_mean, min_samples_leaf, smoothing) elif method_col == "frequency": freq = X[col].value_counts(normalize=True) enc = freq.to_dict() else: # ordinal cats = sorted(X[col].dropna().unique().tolist()) enc = {c: i for i, c in enumerate(cats)} encoding_map[col] = enc X[col] = X[col].map(enc).fillna(global_mean if method == "target_mean" else 0) return X, encoding_map
[docs] def apply_encoding(X: pd.DataFrame, encoding_map: dict, fill_value: float = 0.0) -> pd.DataFrame: """Apply an encoding map (produced by ``encode_high_cardinality``) to new data. Parameters ---------- X : pd.DataFrame encoding_map : dict (from ``encode_high_cardinality``) fill_value : float Value for unseen categories. Returns ------- pd.DataFrame (copy) """ X = X.copy() for col, enc in encoding_map.items(): if col not in X.columns: continue if isinstance(enc, dict): X[col] = X[col].map(enc).fillna(fill_value) elif hasattr(enc, "__getitem__"): X[col] = X[col].map(lambda v, e=enc: e.get(v, fill_value)) return X
# --------------------------------------------------------------------------- # Internal helper: smoothed target-mean encoding # --------------------------------------------------------------------------- def _target_mean_encode( series: pd.Series, y: np.ndarray, global_mean: float, min_samples_leaf: int, smoothing: float, ) -> dict: """Smoothed target-mean encoding (Micci-Barreca, 2001).""" df = pd.DataFrame({"cat": series.values, "y": y}) agg = df.groupby("cat")["y"].agg(["mean", "count"]) smoother = 1.0 / (1.0 + np.exp(-(agg["count"] - min_samples_leaf) / smoothing)) smoothed_mean = smoother * agg["mean"] + (1 - smoother) * global_mean return smoothed_mean.to_dict()