Source code for hugiml.classifier

# Copyright 2026 Srikumar Krishnamoorthy
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""HUGIMLClassifier — C++ accelerated, scikit-learn compatible classifier.

``HUGIMLClassifier`` is the primary public class name.
``HUGIMLClassifierNative`` remains as a backward-compatible alias.

Implements the High Utility Gain Interpretable Machine Learning (HUG-IML)
algorithm from:

    Krishnamoorthy, S. (2024). Interpretable Classifier Models for Decision
    Support Using High Utility Gain Patterns. IEEE Access, 12, 126088–126107.
    DOI: 10.1109/ACCESS.2024.3455563

Computationally intensive stages (discretisation, transaction construction,
pattern mining, matrix assembly) run at native speed via a compiled C++
extension with optional OpenMP parallelism.  The Python layer handles
DataFrame ingestion, column-type detection, downstream estimation,
explanation methods, monitoring, and drift detection.

Architecture
------------
C++ extension (_hugiml_core):
    Discretisation, transaction construction, top-K HUI pattern mining with
    information-gain filtering, bitmap-accelerated matrix assembly, OpenMP
    parallel pattern matching.

Python layer:
    Column-type detection (prepareXy), NaN/Inf imputation, downstream sklearn
    estimator (LogisticRegression default), explanation methods
    (get_hug_features, get_pattern_info, feature_importances), versioned
    model serialisation, prediction monitoring, multi-method drift detection,
    latency SLA enforcement, and graceful degradation under memory pressure.

Quick start
-----------
Two usage paths are supported:

**Path A — prepareXy** (recommended when the full dataset is available upfront)::

    from hugiml import HUGIMLClassifier

    clf = HUGIMLClassifier()
    X, y = clf.prepareXy(X_df, y_series)
    X_tr, X_te, y_tr, y_te = train_test_split(X, y, stratify=y)
    clf.fit(X_tr, y_tr)
    proba = clf.predict_proba(X_te)

    print(clf.model_summary())
    print(clf.feature_importances())

**Path B — allCols + origColumns** (cross-validation loops)::

    clf = HUGIMLClassifier(
        allCols=[int_cols, float_cols, cat_cols],
        origColumns=X_df.columns.tolist(),
    )
    clf.fit(X_train, y_train)

Monitoring and drift detection::

    clf.enable_monitoring()
    clf.predict_proba(X_new)
    print(clf.monitor.report())

    drift = clf.detect_drift(X_new)
    print(drift)

Versioned serialisation::

    clf.save_model("model.hugiml")
    clf2 = HUGIMLClassifier.load_model("model.hugiml")
"""

from __future__ import annotations

import copy
import dataclasses
import logging
import math
import os
import threading
import time
import tracemalloc
import warnings
from typing import Any

import numpy as np
import pandas as pd
from scipy.sparse import csr_matrix, hstack, issparse
from sklearn.base import BaseEstimator, ClassifierMixin, TransformerMixin
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.utils.validation import check_is_fitted

from hugiml._binning import (
    _apply_edges as _adap_apply_edges,
)
from hugiml._binning import (
    _quantile_edges as _adap_quantile_edges,
)
from hugiml._binning import (
    _select_b as _adap_select_b,
)
from hugiml._compat import check_array, check_X_y
from hugiml.exceptions import (
    HUGIMLConvergenceWarning,
    HUGIMLDtypeDriftWarning,
    HUGIMLMemoryError,
    HUGIMLMiningError,
    HUGIMLParamError,
    HUGIMLPredictionError,
    HUGIMLRangeWarning,
    HUGIMLSchemaError,
    HUGIMLTimeoutError,
    HUGIMLValidationError,
    HUGIMLVersionError,
    HUGIMLWarning,
)
from hugiml.monitoring import DriftDetector, PredictionMonitor
from hugiml.serialization import MIN_SCHEMA_VERSION, MODEL_SCHEMA_VERSION
from hugiml.serialization import load_model as _load_model
from hugiml.serialization import save_model as _save_model

try:
    import _hugiml_core as _core

    _CORE_AVAILABLE: bool = True
except ImportError:
    _core = None
    _CORE_AVAILABLE = False


DEFAULT_AUGMENTED_PAIR_MAX_FEATURES = 10
DEFAULT_AUGMENTED_PAIR_UNBOUNDED_CAP = 100
AUGMENTED_PAIR_OPS = ("product", "absolute_difference", "sum", "signed_difference")


def _best_ig_score(score_obj: Any) -> float:
    """Return the best finite IG score from native adaptive-binning metadata."""
    if isinstance(score_obj, dict):
        vals: list[float] = []
        for value in score_obj.values():
            try:
                fval = float(value)
            except Exception:
                continue
            if np.isfinite(fval):
                vals.append(fval)
        return max(vals) if vals else 0.0
    try:
        fval = float(score_obj)
    except Exception:
        return 0.0
    return fval if np.isfinite(fval) else 0.0


def _dense_full_csr(Z: np.ndarray) -> csr_matrix:
    """Convert a dense, mostly non-zero float block to CSR without scanning."""
    n_rows, n_cols = Z.shape
    if n_cols == 0:
        return csr_matrix((n_rows, 0), dtype=np.float32)
    data = np.ascontiguousarray(Z, dtype=np.float32).ravel()
    indices = np.tile(np.arange(n_cols, dtype=np.int32), n_rows)
    indptr = np.arange(0, (n_rows + 1) * n_cols, n_cols, dtype=np.int32)
    return csr_matrix((data, indices, indptr), shape=(n_rows, n_cols), dtype=np.float32)


def _entropy_from_counts(counts: np.ndarray) -> float:
    total = float(np.sum(counts))
    if total <= 0.0:
        return 0.0
    probs = counts.astype(np.float64, copy=False) / total
    probs = probs[probs > 0.0]
    return float(-np.sum(probs * np.log2(probs)))


def _information_gain_from_codes(
    feature_codes: np.ndarray, y_codes: np.ndarray, n_classes: int
) -> float:
    """Return IG(y; feature) for integer-coded feature values."""
    y_codes = np.asarray(y_codes, dtype=np.int64)
    feature_codes = np.asarray(feature_codes, dtype=np.int64)
    valid = (feature_codes >= 0) & (y_codes >= 0)
    if not np.any(valid):
        return 0.0
    f = feature_codes[valid]
    yv = y_codes[valid]
    base = _entropy_from_counts(np.bincount(yv, minlength=n_classes))
    if base <= 0.0:
        return 0.0
    _, inv = np.unique(f, return_inverse=True)
    cond = 0.0
    n = float(len(yv))
    for code in range(int(inv.max()) + 1):
        mask = inv == code
        if not np.any(mask):
            continue
        weight = float(np.sum(mask)) / n
        cond += weight * _entropy_from_counts(np.bincount(yv[mask], minlength=n_classes))
    return max(0.0, float(base - cond))


def _continuous_to_quantile_codes(values: np.ndarray, max_bins: int = 16) -> np.ndarray:
    """Quantile-code a continuous column for strict topK IG ranking."""
    arr = np.asarray(values, dtype=np.float64)
    codes = np.full(arr.shape[0], -1, dtype=np.int64)
    finite = np.isfinite(arr)
    if not np.any(finite):
        return codes
    vals = arr[finite]
    uniq = np.unique(vals)
    if uniq.size <= max_bins:
        _, inv = np.unique(vals, return_inverse=True)
        codes[finite] = inv.astype(np.int64, copy=False)
        return codes
    qs = np.linspace(0.0, 1.0, max_bins + 1)[1:-1]
    edges = np.unique(np.quantile(vals, qs))
    if edges.size == 0:
        codes[finite] = 0
    else:
        codes[finite] = np.searchsorted(edges, vals, side="right").astype(np.int64, copy=False)
    return codes


class NativeAugmentedPairTransformBlock:
    """Native-backed L>1 pair augmentation state and transform wrapper.

    Candidate scoring and feature generation are fully delegated to the native
    ``_hugiml_core`` extension.  Python only selects source columns from already
    fitted adaptive-binning IG metadata, stores audit metadata, and prepares the
    compact numeric arrays needed by the native routines.
    """

    def __init__(
        self,
        max_features: int = DEFAULT_AUGMENTED_PAIR_MAX_FEATURES,
        budget_topK: int | None = None,
        min_source_ig: float | None = None,
        unbounded_cap: int = DEFAULT_AUGMENTED_PAIR_UNBOUNDED_CAP,
    ) -> None:
        self.max_features = int(max_features)
        self.top_ig = self.max_features
        self.budget_topK = None if budget_topK is None else int(budget_topK)
        self.min_source_ig = None if min_source_ig is None else float(min_source_ig)
        self.unbounded_cap = int(unbounded_cap)

    def _as_frame(
        self, X: Any, cols: list[str], full_feature_names: list[str] | None = None
    ) -> pd.DataFrame:
        if isinstance(X, pd.DataFrame):
            X_df = X
        else:
            arr = np.asarray(X)
            if arr.ndim == 1:
                arr = arr.reshape(1, -1)
            schema = list(full_feature_names or getattr(self, "input_feature_names_", []) or [])
            if schema and len(schema) == arr.shape[1]:
                names = schema
            else:
                names = [f"col{j}" for j in range(arr.shape[1])]
            X_df = pd.DataFrame(arr, columns=names)
        missing = [col for col in cols if col not in X_df.columns]
        if missing:
            X_df = X_df.copy()
            for col in missing:
                X_df[col] = np.nan
        return X_df

    def _selected_numeric_matrix(self, X: Any, cols: list[str] | None = None) -> np.ndarray:
        selected = list(cols or getattr(self, "selected_ig_features_", []))
        n_rows = len(X) if hasattr(X, "__len__") else 0
        if not selected:
            return np.zeros((n_rows, 0), dtype=np.float64)
        X_df = self._as_frame(X, selected, list(getattr(self, "input_feature_names_", []) or []))
        try:
            mat = X_df.reindex(columns=selected).to_numpy(dtype=np.float64, copy=True)
        except Exception:
            mat = np.column_stack(
                [
                    pd.to_numeric(X_df[col], errors="coerce").to_numpy(dtype=np.float64)
                    for col in selected
                ]
            )
        return np.ascontiguousarray(mat, dtype=np.float64)

    def _pair_index_arrays(self) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
        pos = {col: idx for idx, col in enumerate(getattr(self, "selected_ig_features_", []))}
        left: list[int] = []
        right: list[int] = []
        ops: list[int] = []
        for spec in getattr(self, "kept_specs_", []):
            a, b = spec["inputs"]
            left.append(pos[a])
            right.append(pos[b])
            operation = str(spec["operation"])
            op_map = {
                "product": 0,
                "absolute_difference": 1,
                "sum": 2,
                "signed_difference": 3,
            }
            if operation not in op_map:
                raise HUGIMLParamError(f"Unknown augmented-pair operation: {operation!r}.")
            ops.append(op_map[operation])
        return (
            np.asarray(left, dtype=np.int64),
            np.asarray(right, dtype=np.int64),
            np.asarray(ops, dtype=np.int8),
        )

    def fit(
        self,
        X: Any,
        y: Any,
        ig_scores: dict[str, Any],
        bin_edges: dict[str, Any],
        numeric_cols: list[str],
        budget_topK: int | None = None,
        min_source_ig: float | None = None,
        full_feature_names: list[str] | None = None,
    ) -> NativeAugmentedPairTransformBlock:
        if not (
            _CORE_AVAILABLE
            and hasattr(_core, "score_pair_candidates")
            and hasattr(_core, "transform_pair_features")
        ):
            raise HUGIMLParamError(
                "Native augmented pair transforms require _hugiml_core.score_pair_candidates "
                "and _hugiml_core.transform_pair_features. Rebuild the native extension."
            )
        self.input_feature_names_ = list(full_feature_names or [])
        if budget_topK is not None:
            self.budget_topK = int(budget_topK)
        if min_source_ig is not None:
            self.min_source_ig = float(min_source_ig)
        # ``None`` means no augmented-pair pre-budget. This is used by
        # topk_budget_strict=True so the single global topK filter ranks
        # original + HUG pattern + augmented-pair features together exactly once.
        # Negative values retain a safety cap for explicitly
        # unbounded non-strict augmented-pair selection.
        if self.budget_topK is not None and self.budget_topK < 0:
            self.budget_topK = max(0, self.unbounded_cap)

        min_ig = max(1e-12, float(self.min_source_ig or 0.0))
        self.min_source_ig_ = min_ig
        scored: list[tuple[float, str]] = []
        for col in numeric_cols:
            score = _best_ig_score((ig_scores or {}).get(col, {}))
            if score >= min_ig:
                scored.append((score, col))
        scored.sort(key=lambda item: (-item[0], item[1]))
        self.selected_ig_features_ = [col for _, col in scored[: min(self.top_ig, len(scored))]]
        self.selected_ig_scores_ = {col: float(score) for score, col in scored}
        self.input_bin_edges_ = {
            col: (
                np.asarray(bin_edges[col], dtype=float).tolist()
                if col in (bin_edges or {})
                else None
            )
            for col in self.selected_ig_features_
        }

        X_selected = self._selected_numeric_matrix(X, self.selected_ig_features_)
        if X_selected.shape[1] == 0:
            self.source_observed_medians_ = {}
            self.source_observed_medians_array_ = np.zeros(0, dtype=np.float64)
            self.numeric_medians_ = {}
            self.numeric_medians_array_ = np.zeros(0, dtype=np.float64)
            self.kept_specs_ = []
            self.candidate_count_ = 0
            self.feature_names_ = []
            self.augmented_pair_transforms_ = []
            self.augmented_pair_native_used_ = True
            self.scaler_mean_ = np.zeros(0, dtype=np.float64)
            self.scaler_scale_ = np.zeros(0, dtype=np.float64)
            self.pair_reference_values_ = np.zeros(0, dtype=np.float64)
            return self

        observed_medians = np.nanmedian(
            np.where(np.isfinite(X_selected), X_selected, np.nan), axis=0
        )
        observed_medians = np.where(np.isfinite(observed_medians), observed_medians, 0.0).astype(
            np.float64, copy=False
        )
        self.source_observed_medians_array_ = observed_medians
        self.source_observed_medians_ = {
            col: float(observed_medians[j]) for j, col in enumerate(self.selected_ig_features_)
        }
        # Backward-compatible internal aliases for older saved state readers.
        # Augmented-pair feature construction does not median-fill source columns.
        self.numeric_medians_array_ = observed_medians
        self.numeric_medians_ = dict(self.source_observed_medians_)

        y_codes, _ = pd.factorize(np.asarray(y), sort=True)
        native_specs = _core.score_pair_candidates(
            X_selected,
            np.asarray(y_codes, dtype=np.int64),
            list(self.selected_ig_features_),
        )
        self.augmented_pair_native_used_ = True
        candidates = list(native_specs)
        candidates.sort(key=lambda item: (-float(item["transform_ig"]), str(item["name"])))
        self.candidate_count_ = len(candidates)
        keep_n = (
            len(candidates)
            if self.budget_topK is None
            else min(int(self.budget_topK), len(candidates))
        )
        self.kept_specs_ = candidates[:keep_n]
        self.feature_names_ = [str(spec["name"]) for spec in self.kept_specs_]

        if self.kept_specs_:
            left, right, ops = self._pair_index_arrays()
            pair_refs = np.asarray(
                [float(spec.get("reference_raw_value", 0.0)) for spec in self.kept_specs_],
                dtype=np.float64,
            )
            self.pair_reference_values_ = pair_refs
            raw = _core.transform_pair_features(
                X_selected,
                left,
                right,
                ops,
                pair_refs,
                np.zeros(len(self.kept_specs_), dtype=np.float64),
                np.ones(len(self.kept_specs_), dtype=np.float64),
            ).astype(np.float64, copy=False)
            self.scaler_mean_ = pair_refs.copy()
            centered = raw - pair_refs.reshape(1, -1)
            scale = np.sqrt(np.mean(centered * centered, axis=0))
            self.scaler_scale_ = np.where(np.isfinite(scale) & (scale > 0), scale, 1.0).astype(
                np.float64, copy=False
            )
            self.left_indices_ = left
            self.right_indices_ = right
            self.op_codes_ = ops
        else:
            self.scaler_mean_ = np.zeros(0, dtype=np.float64)
            self.scaler_scale_ = np.zeros(0, dtype=np.float64)
            self.left_indices_ = np.zeros(0, dtype=np.int64)
            self.right_indices_ = np.zeros(0, dtype=np.int64)
            self.op_codes_ = np.zeros(0, dtype=np.int8)
            self.pair_reference_values_ = np.zeros(0, dtype=np.float64)
        self.augmented_pair_transforms_ = self._build_catalog()
        return self

    def transform(self, X: Any) -> csr_matrix:
        n_rows = len(X) if hasattr(X, "__len__") else 0
        if not getattr(self, "kept_specs_", []):
            return csr_matrix((n_rows, 0), dtype=np.float32)
        X_selected = self._selected_numeric_matrix(
            X, list(getattr(self, "selected_ig_features_", []))
        )
        Z = _core.transform_pair_features(
            X_selected,
            np.asarray(getattr(self, "left_indices_", np.zeros(0)), dtype=np.int64),
            np.asarray(getattr(self, "right_indices_", np.zeros(0)), dtype=np.int64),
            np.asarray(getattr(self, "op_codes_", np.zeros(0)), dtype=np.int8),
            np.asarray(
                getattr(
                    self, "pair_reference_values_", np.zeros(len(getattr(self, "kept_specs_", [])))
                ),
                dtype=np.float64,
            ),
            np.asarray(getattr(self, "scaler_mean_", np.zeros(0)), dtype=np.float64),
            np.asarray(getattr(self, "scaler_scale_", np.ones(0)), dtype=np.float64),
        )
        return _dense_full_csr(np.asarray(Z, dtype=np.float32))

    def _build_catalog(self) -> list[dict[str, Any]]:
        out: list[dict[str, Any]] = []
        candidate_count = int(getattr(self, "candidate_count_", 0))
        means = np.asarray(getattr(self, "scaler_mean_", np.zeros(0)), dtype=np.float64)
        scales = np.asarray(getattr(self, "scaler_scale_", np.ones(0)), dtype=np.float64)
        for rank, spec in enumerate(getattr(self, "kept_specs_", []), start=1):
            left, right = list(spec["inputs"])
            mean = float(means[rank - 1]) if rank - 1 < means.size else 0.0
            scale = float(scales[rank - 1]) if rank - 1 < scales.size else 1.0
            raw_formula = str(spec["formula"])
            out.append(
                {
                    "name": str(spec["name"]),
                    "kind": "augmented_pair_transform",
                    "operation": str(spec["operation"]),
                    "inputs": [left, right],
                    "formula": raw_formula,
                    "raw_formula": raw_formula,
                    "standardized_formula": f"({raw_formula} - {mean:.12g}) / {scale:.12g}",
                    "standardization": {"mean": mean, "scale": scale},
                    "standardization_mean": mean,
                    "standardization_scale": scale,
                    "source_observed_medians": {
                        left: float(getattr(self, "source_observed_medians_", {}).get(left, 0.0)),
                        right: float(getattr(self, "source_observed_medians_", {}).get(right, 0.0)),
                    },
                    "pair_missing_policy": "reference_value_for_unavailable_pair",
                    "reference_raw_value": float(spec.get("reference_raw_value", mean)),
                    "eligible_count": int(spec.get("eligible_count", 0)),
                    "eligible_rate": float(spec.get("eligible_rate", np.nan)),
                    "missing_pair_rate": float(spec.get("missing_pair_rate", np.nan)),
                    "selected_by": f"native_hugiml_adaptive_binning_ig_top_{self.max_features}_observed_pair_transform_ig",
                    "source_ig": {
                        left: float(self.selected_ig_scores_.get(left, 0.0)),
                        right: float(self.selected_ig_scores_.get(right, 0.0)),
                    },
                    "source_bin_edges": {
                        left: self.input_bin_edges_.get(left),
                        right: self.input_bin_edges_.get(right),
                    },
                    "transform_ig": float(spec["transform_ig"]),
                    "transform_bin_edges": spec.get("transform_bin_edges"),
                    "rank": rank,
                    "budget_topK": None if self.budget_topK is None else int(self.budget_topK),
                    "candidate_count": candidate_count,
                    "augmented_pair_max_features": int(self.max_features),
                    "used_in_hugiml_mining": False,
                    "eligible_for_L2": False,
                    "integration_point": "before_downstream_lr",
                }
            )
        return out


# ---------------------------------------------------------------------------
# Helpers: RSS memory (Unix) with Windows fallback
# ---------------------------------------------------------------------------
try:
    import resource as _resource

    def _get_peak_rss_kb() -> int:
        return int(_resource.getrusage(_resource.RUSAGE_SELF).ru_maxrss)

except ImportError:

    def _get_peak_rss_kb() -> int:
        try:
            import psutil

            return int(psutil.Process().memory_info().peak_wset) // 1024
        except ImportError:
            return 0


logger = logging.getLogger(__name__)


# =============================================================================
# Configuration presets
# =============================================================================

_PRESETS: dict[str, dict] = {
    "quick": dict(B=5, L=1, G=1e-2, topK=50),
    "balanced": dict(B=7, L=1, G=5e-3, topK=-1),
    "thorough": dict(B=-1, L=2, G=1e-4, topK=-1),
}


# =============================================================================
# Fit metadata
# =============================================================================


[docs] @dataclasses.dataclass(frozen=True) class FitMetadata: """Immutable record of everything that happened during fit(). Attributes ---------- n_samples, n_features : int Training set dimensions. n_classes : int Number of distinct target classes. n_items : int Number of utility-annotated items (bins + categories). n_patterns : int Number of HUG patterns mined and retained. n_compound : int Compound patterns (length > 1). n_augmented_pairs : int Number of augmented pair features retained for the downstream estimator. n_downstream_features : int Number of columns used by the downstream estimator after feature-mode construction and optional strict TopK filtering. downstream_feature_counts : dict Counts by downstream feature family, for example original, pattern, and augmented_pair. topK_used : int Effective topK budget used during mining. stage_times_ms : dict[str, float] Wall-clock milliseconds per fit stage. total_fit_ms : float Total fit wall-clock milliseconds. matrix_density : float Fraction of non-zero entries in the training pattern matrix. config : dict Snapshot of (B, L, G, topK) as used. memory_peak_mb : float Python-traced peak memory during fit. memory_rss_mb : float RSS delta during fit (Unix only). memory_cpp_mb : float Estimated C++ extension memory usage. openmp_threads : int Number of OpenMP threads used. degraded : bool True when fit fell back to reduced parameters. """ n_samples: int n_features: int n_classes: int n_items: int n_patterns: int n_compound: int topK_used: int stage_times_ms: dict total_fit_ms: float matrix_density: float config: dict n_augmented_pairs: int = 0 n_downstream_features: int = 0 downstream_feature_counts: dict = dataclasses.field(default_factory=dict) memory_peak_mb: float = 0.0 memory_rss_mb: float = 0.0 memory_cpp_mb: float = 0.0 openmp_threads: int = 1 degraded: bool = False
[docs] def summary(self) -> str: """Return a single-line human-readable summary of the fit outcome.""" downstream_text = ( f", {self.n_augmented_pairs} augmented pairs, " f"{self.n_downstream_features} downstream features" if self.n_downstream_features else f", {self.n_augmented_pairs} augmented pairs" ) return ( f"{self.n_patterns} patterns " f"({self.n_compound} compound){downstream_text} from " f"{self.n_samples}×{self.n_features} in " f"{self.total_fit_ms:.0f} ms " f"[density={self.matrix_density:.4f}]" )
# ============================================================================= # Memory profiling context manager # ============================================================================= # tracemalloc is a process-global resource. Concurrent fits on separate # instances would race on is_tracing / start / stop without this lock. _tracemalloc_lock = threading.Lock() class _MemoryTracker: """Track peak memory during a code block via tracemalloc + RSS. Thread-safe: a module-level lock ensures that only one fit() at a time owns the tracemalloc session. Other concurrent fits skip tracing and report traced_peak_mb = 0.0, which is clearly distinguished from a real measurement rather than a corrupted one. """ def __enter__(self) -> _MemoryTracker: self._rss_before = _get_peak_rss_kb() self._lock_acquired = _tracemalloc_lock.acquire(blocking=False) self._snap_before: tracemalloc.Snapshot | None = None if self._lock_acquired: if not tracemalloc.is_tracing(): tracemalloc.start() self._started = True else: self._started = False self._snap_before = tracemalloc.take_snapshot() else: self._started = False self._snap_before = None return self def __exit__(self, *exc: object) -> None: if self._lock_acquired: try: if self._snap_before is not None: snap_after = tracemalloc.take_snapshot() stats = snap_after.compare_to(self._snap_before, "lineno") self.traced_peak_mb = sum(s.size for s in stats if s.size > 0) / 1e6 else: self.traced_peak_mb = 0.0 finally: if self._started: tracemalloc.stop() _tracemalloc_lock.release() else: self.traced_peak_mb = 0.0 self.rss_mb = (_get_peak_rss_kb() - self._rss_before) / 1024 @staticmethod def estimate_fit_mb(n: int, p: int, n_items: int, K: int) -> float: """Rough peak-memory estimate in MB for a fit() call.""" disc_mb = n * p * 4 / 1e6 trans_mb = n * p * 16 / 1e6 ul_mb = n_items * n * 24 / 1e6 matrix_mb = n * min(K, n_items) * 4 / 1e6 overhead = 50 return disc_mb + trans_mb + ul_mb + matrix_mb + overhead # ============================================================================= # Transaction data wrapper (C++ ↔ Python bridge) # ============================================================================= class _TransactionDataWrapper: """Augments native TransactionDataCpp with Python-compatible attributes. Stores exact C++ state (prefixed _cpp_) so that deserialized models can still run predict() via the pure-Python fallback transform. """ def __init__(self, td_native: Any, classifier: HUGIMLClassifierNative) -> None: self._td = td_native self._clf = classifier self._cpp_bn2id = dict(td_native.bn2id) self._cpp_bkey_stride = int(td_native.bkey_stride) self._cpp_col_min = np.array(td_native.col_min, dtype=np.float64) self._cpp_col_range = np.array(td_native.col_range, dtype=np.float64) self._cpp_all_edges = [np.array(e, dtype=np.float64) for e in td_native.all_edges] self._cpp_nb_col = list(td_native.nb_col) self._cpp_is_cat = list(td_native.is_cat_v) self._cpp_is_int = list(td_native.is_int_v) # Store is_precoded_v so the Python fallback uses direct code→bi mapping # rather than MinMax-scaling the integer codes through the float path. self._cpp_is_precoded = list(td_native.is_precoded_v) if td_native.is_precoded_v else [] self._cpp_cat_categories = [list(c) for c in td_native.cat_categories] self.bn2id = self._build_compat_bn2id() self.all_edges = self._cpp_all_edges self.col_range = self._cpp_col_range self.col_min = self._cpp_col_min self.is_cat = classifier.cat_cols_mask_ self.is_int = classifier.is_int_mask_ def __getattr__(self, name: str) -> Any: return getattr(self._td, name) def __getstate__(self) -> dict: state = {k: v for k, v in self.__dict__.items() if k not in ("_td", "_clf")} if self._td is not None: state["item_map"] = dict(self._td.item_map) state["item_twu"] = list(self._td.item_twu) state["nb_col"] = list(self._td.nb_col) return state def __setstate__(self, state: dict) -> None: for k, v in state.items(): setattr(self, k, v) self._td = None def _build_compat_bn2id(self) -> dict: bn2id: dict = {} item_map = self._td.item_map feature_items: dict[str, list] = {} for item_id, label in item_map.items(): if "=" in label: feat_name = label.split("=")[0] feature_items.setdefault(feat_name, []).append(item_id) feature_names = getattr(self._clf, "feature_names_in_", None) or self._clf.origColumns if feature_names is None: return bn2id stride = self._cpp_bkey_stride for col_idx, feat_name in enumerate(feature_names): if feat_name in feature_items: for bin_idx, item_id in enumerate(sorted(feature_items[feat_name])): bn2id[(col_idx * stride) + bin_idx] = item_id return bn2id # ============================================================================= # HUGIMLClassifierNative # ============================================================================= # ============================================================================= # ── v1.1.0 Per-feature adaptive binning — module-level helpers ────────────── # # Imported from hugiml._binning — the single source of truth for all # adaptive-binning maths. Local aliases preserve every existing call-site # inside this file without modification. # # =============================================================================
[docs] class HUGIMLClassifierNative(TransformerMixin, ClassifierMixin, BaseEstimator): """HUG-IML interpretable classifier — C++ accelerated, scikit-learn compatible. Extracts High Utility Gain (HUG) patterns from labelled tabular data, transforms the input into a binary pattern-presence matrix, and fits an interpretable downstream classifier. The mined patterns are human-readable and serve as the primary source of model explanations. Parameters ---------- allCols : list of 3 lists, optional ``[int_col_names, float_col_names, cat_col_names]``. Must be paired with ``origColumns``. origColumns : list of str, optional Ordered column names matching the columns of X passed to fit/predict. B : int, default 8 Number of quantile bins per numerical feature. Use -1 for supervised auto-selection (maximises IG over [2, 20]). L : int, default 2 Maximum HUG pattern length. 1 = singletons; 2 = pairs; -1 = unlimited. G : float, default 1e-4 Minimum information-gain threshold. topK : int, default 200 Maximum number of patterns to retain. -1 computes automatically. base_estimator : sklearn estimator, optional Downstream classifier trained on the binary pattern matrix. Defaults to LogisticRegression. n_jobs : int, default 1 Number of OpenMP threads. -1 uses all available cores. max_predict_ms : float or None Prediction latency budget in milliseconds. max_fit_seconds : float or None Wall-clock budget for the pattern-mining stage of fit(). Transaction preparation and downstream model fitting (e.g. LogisticRegression) are not bounded — total fit() time may exceed this value. When the budget is exhausted mid-mine, graceful degradation produces a smaller pattern set; if even the minimal fallback cannot finish in time, ``HUGIMLTimeoutError`` is raised. verbose : bool, default False Emit INFO-level log messages during fit. Attributes (available after fit) ---------------------------------- classes_ : ndarray — unique class labels. n_features_in_ : int — number of input features. feature_names_in_ : list or None — column names from training data. cat_cols_mask_ : ndarray[bool] — True for categorical columns. is_int_mask_ : ndarray[bool] — True for integer columns. td_ : _TransactionDataWrapper — discretisation artefacts. patterns_ : list — mined HUG patterns. x_train_hup_ : csr_matrix — binary training pattern matrix. model_ : Pipeline — fitted downstream estimator. fit_metadata_ : FitMetadata — timings, memory, pattern stats. monitor : PredictionMonitor or None — prediction statistics. """ _fit_lock: threading.RLock # per-instance, created in __init__ monitor: PredictionMonitor | None # set by enable_monitoring() / disable_monitoring() feature_names_in_: list[str] | None # set by prepareXy / _resolve_col_meta after fit DEFAULT_PARAM_GRID: dict[str, list] = { "B": [-1], "adaptive_binning": [True], "L": [1, 2], "feature_mode": ["patterns_only", "original_plus_patterns"], "topK": [30, 50, 100], "G": [1e-2], } def __init__( self, allCols: list | None = None, origColumns: list | None = None, B: int = 8, L: int = 1, G: float = 1e-3, topK: int = 30, base_estimator: Any = None, n_jobs: int = 1, max_predict_ms: float | None = None, max_fit_seconds: float | None = None, verbose: bool = False, # ── v1.1.0 adaptive binning ─────────────────────────────────────── # When adaptive_binning=True each numerical feature is pre-discretised # to B_j quantile bins chosen by elbow-stopping IG search. The # pre-binned columns are declared categorical before the C++ layer # (global B is overridden to sentinel 2). Bin edges are stored in # _bin_edges_ and reapplied identically at predict/transform time. # ───────────────────────────────────────────────────────────────── adaptive_binning: bool = False, b_candidates: list | None = None, min_marginal_gain_ratio: float = 0.02, feature_mode: str = "patterns_only", use_hotpath: bool = True, augmented_pair_transforms: bool = True, augmented_pair_max_features: int = 10, topk_budget_strict: bool = False, dense_downstream_max_width: int = 200, execution_mode: str = "audit", ) -> None: self.allCols = allCols self.origColumns = origColumns self.B = B self.L = L self.G = G self.topK = topK self.base_estimator = base_estimator self.n_jobs = n_jobs self.max_predict_ms = max_predict_ms self.max_fit_seconds = max_fit_seconds self.verbose = verbose self.adaptive_binning = adaptive_binning self.b_candidates = b_candidates self.min_marginal_gain_ratio = min_marginal_gain_ratio self.feature_mode = feature_mode self.use_hotpath = use_hotpath self.augmented_pair_transforms = augmented_pair_transforms self.augmented_pair_max_features = augmented_pair_max_features self.topk_budget_strict = topk_budget_strict self.dense_downstream_max_width = dense_downstream_max_width # sklearn estimator compatibility: constructor and set_params must not # validate parameter values. execution_mode is validated in fit/load. self.execution_mode = execution_mode self._fit_lock = threading.RLock() # ── Execution-mode retention helpers ───────────────────────────────────── def _is_production_mode(self) -> bool: """Return True when audit/governance-heavy artifacts are not retained.""" return getattr(self, "execution_mode", "audit") == "production" def _audit_artifact_message(self, artifact: str) -> str: return ( f"{artifact} is not available because this model was fitted or loaded with " "execution_mode='production'. Refit the model with execution_mode='audit' " "or load a model file that was originally saved from audit mode for complete " "traceability and audit/governance artifacts." ) def _require_audit_artifact(self, artifact: str, *required_attrs: str) -> None: """Raise a clear error when an audit/governance artifact is unavailable. Passing no required attributes is treated as an unconditional audit-only guard in production mode. When required attributes are supplied, their presence is checked in every execution mode: production receives the governance-oriented retention message, while audit mode receives a fitted-state/corrupt-state message. This keeps callers such as ``get_pattern_info()`` and drift helpers from falling through to an ``AttributeError`` if an expected audit artifact is absent. """ is_prod = self._is_production_mode() if not required_attrs: if is_prod: raise RuntimeError(self._audit_artifact_message(artifact)) return missing = [ attr for attr in required_attrs if (not hasattr(self, attr)) or getattr(self, attr, None) is None ] if missing: if is_prod: raise RuntimeError(self._audit_artifact_message(artifact)) raise RuntimeError( f"{artifact} is unavailable because required fitted artifact(s) " f"are missing: {', '.join(missing)}. Refit the model or reload a " "complete audit-mode model file." ) def _apply_execution_mode_retention(self) -> None: """Drop audit/governance-heavy training artifacts in production mode. Prediction-critical state is retained: td_, patterns_, model_, bin/scaler metadata, selected downstream names/masks, augmented-pair transform metadata, and privacy-safe aggregate downstream metadata that was already cached before retention. Training matrices, drift baselines, and native-only transient score caches are audit/governance artifacts and are intentionally omitted in production mode. """ if not self._is_production_mode(): return x_hup = getattr(self, "x_train_hup_", None) if x_hup is not None: self._training_pattern_matrix_shape_ = tuple(int(v) for v in x_hup.shape) self._training_pattern_matrix_nnz_ = int(getattr(x_hup, "nnz", 0)) x_down = getattr(self, "x_train_downstream_", None) if x_down is not None: self._training_downstream_matrix_shape_ = tuple(int(v) for v in x_down.shape) if hasattr(x_down, "nnz"): self._training_downstream_matrix_nnz_ = int(x_down.nnz) elif hasattr(x_down, "shape"): self._training_downstream_matrix_nnz_ = int(np.count_nonzero(x_down)) else: self._training_downstream_matrix_nnz_ = 0 for attr in ( "x_train_hup_", "x_train_downstream_", # _downstream_* aggregate metadata is retained: it is aligned to # downstream feature names and lets feature_importances() report a # stable schema after production retention without needing training # matrices. "_native_original_feature_scores_downstream_", "_drift_det", ): self.__dict__.pop(attr, None) # ── Class methods ─────────────────────────────────────────────────────────
[docs] @classmethod def from_preset(cls, name: str, **overrides: Any) -> HUGIMLClassifierNative: """Create a classifier from a named configuration preset. Parameters ---------- name : {'quick', 'balanced', 'thorough'} quick — B=5, L=1, G=1e-2, topK=50 balanced — B=7, L=1, G=5e-3, topK=-1 thorough — B=-1, L=2, G=1e-4, topK=-1 Returns ------- HUGIMLClassifierNative """ if name not in _PRESETS: raise HUGIMLParamError(f"Unknown preset '{name}'. Available: {list(_PRESETS)}") params = {**_PRESETS[name], **overrides} return cls(**params)
[docs] @classmethod def default_param_grid(cls) -> dict[str, list]: """Return the default validation grid for compact HUGIML tuning. The grid uses adaptive binning (``B=-1``), searches ``L`` in ``{1, 2}``, searches ``feature_mode`` in ``{'patterns_only', 'original_plus_patterns'}``, keeps ``G`` fixed at 1e-3, and searches ``topK`` in ``{30, 50, 100}``. For ``L > 1`` and ``augmented_pair_transforms=True``, native augmented-pair transforms are created internally from the top-10 native-IG numeric features and capped to the same ``topK`` budget by transform IG. """ return {k: list(v) for k, v in cls.DEFAULT_PARAM_GRID.items()}
# ── Representation ──────────────────────────────────────────────────────── def __repr__(self) -> str: fitted = hasattr(self, "patterns_") status = f", {len(self.patterns_)} patterns" if fitted else ", not fitted" adap = ", adaptive" if self.adaptive_binning else "" mode = f", feature_mode={self.feature_mode}" aug = f", augmented_pair_transforms={self.augmented_pair_transforms}" exec_mode = f", execution_mode={self.execution_mode}" return f"HUGIMLClassifier(B={self.B}, L={self.L}, G={self.G}{adap}{mode}{aug}{exec_mode}{status})" # ── sklearn protocol ──────────────────────────────────────────────────────
[docs] def get_params(self, deep: bool = True) -> dict: """Return constructor parameters as a dict (sklearn protocol).""" return dict( allCols=self.allCols, origColumns=self.origColumns, B=self.B, L=self.L, G=self.G, topK=self.topK, base_estimator=(copy.deepcopy(self.base_estimator) if deep else self.base_estimator), n_jobs=self.n_jobs, max_predict_ms=self.max_predict_ms, max_fit_seconds=self.max_fit_seconds, verbose=self.verbose, adaptive_binning=self.adaptive_binning, b_candidates=self.b_candidates, min_marginal_gain_ratio=self.min_marginal_gain_ratio, feature_mode=self.feature_mode, use_hotpath=self.use_hotpath, augmented_pair_transforms=self.augmented_pair_transforms, augmented_pair_max_features=self.augmented_pair_max_features, topk_budget_strict=self.topk_budget_strict, dense_downstream_max_width=self.dense_downstream_max_width, execution_mode=self.execution_mode, )
[docs] def set_params(self, **params: Any) -> HUGIMLClassifierNative: """Set constructor parameters in-place and return self (sklearn protocol).""" # Defer validation until fit(), matching sklearn estimator conventions. for k, v in params.items(): setattr(self, k, v) return self
def _more_tags(self) -> dict: return { "requires_y": True, "binary_only": False, "poor_score": False, "X_types": ["2darray", "dataframe"], "allow_nan": False, } def __sklearn_tags__(self) -> Any: """Declare sklearn 1.6+ Tags, including TransformerTags for transform(). ``__sklearn_tags__`` was introduced in sklearn 1.6. Base classes on older installations do not implement it, so ``super().__sklearn_tags__()`` raises ``AttributeError``. Guard that call and return ``None`` when the parent chain does not support the protocol — callers must handle ``None``. """ try: tags = super().__sklearn_tags__() except AttributeError: logger.debug( "super().__sklearn_tags__() raised AttributeError; " "sklearn base classes do not implement the tag protocol " "(expected sklearn >= 1.6).", ) return None try: from sklearn.utils._tags import TransformerTags tags.transformer_tags = TransformerTags() except ImportError: logger.debug( "sklearn.utils._tags.TransformerTags not available; " "TransformerTags will not be declared.", exc_info=True, ) return tags # ── Pickle protocol ─────────────────────────────────────────────────────── def __getstate__(self) -> dict: state = self.__dict__.copy() state.pop("_fit_lock", None) # Remove instance-level methods set by instrument_classifier():\n # these closures are not picklable. state.pop("predict_proba", None) state.pop("predict", None) state["_schema_version_"] = MODEL_SCHEMA_VERSION if "patterns_" in state and state["patterns_"]: state["patterns_"] = [ {"utility": pe.utility, "items": list(pe.items), "ig": pe.ig} for pe in state["patterns_"] ] state["_patterns_pickled_"] = True # serialize raw_patterns_ (also holds PatternEntry objects) ── # __getstate__ must convert patterns_ and raw_patterns_ consistently as # native PatternEntry objects, which are not picklable/deepcopyable. # Mirror the same dict-serialisation used for patterns_. if "raw_patterns_" in state and state["raw_patterns_"]: try: state["raw_patterns_"] = [ {"utility": pe.utility, "items": list(pe.items), "ig": pe.ig} for pe in state["raw_patterns_"] ] state["_raw_patterns_pickled_"] = True except (AttributeError, TypeError): # Already serialized (dicts) or empty — leave as-is pass return state def __setstate__(self, state: dict) -> None: schema_ver = state.pop("_schema_version_", 1) if schema_ver < MIN_SCHEMA_VERSION: raise HUGIMLVersionError( f"Model schema version {schema_ver} is too old. " f"Minimum supported: {MIN_SCHEMA_VERSION}. Re-fit the model." ) if state.pop("_patterns_pickled_", False): class _PE: __slots__ = ("utility", "items", "ig") def __init__(self, d: dict) -> None: self.utility = d["utility"] self.items = d["items"] self.ig = d["ig"] state["patterns_"] = [_PE(d) for d in state["patterns_"]] # restore raw_patterns_ from its serialized dict form ────── if state.pop("_raw_patterns_pickled_", False): class _PE2: __slots__ = ("utility", "items", "ig") def __init__(self, d: dict) -> None: self.utility = d["utility"] self.items = d["items"] self.ig = d["ig"] if "raw_patterns_" in state and state["raw_patterns_"]: state["raw_patterns_"] = [_PE2(d) for d in state["raw_patterns_"]] self.__dict__.update(state) # Drop unsupported multi-round attributes when loading serialized estimators. for _attr in ( "n_rounds", "g_decay_factor", "pattern_selection", "transaction_weighting", "_boosting_round_tds_", "_boosting_round_pats_", ): self.__dict__.pop(_attr, None) self._fit_lock = threading.RLock() # ── v1.1.0 backward compatibility ───────────────────────────────── # Models saved with v1.0.0 have no adaptive_binning in their pickle # state. Initialise all adaptive attrs to their off-state defaults # so the model behaves identically to a v1.0.0 model after restore. if not hasattr(self, "adaptive_binning"): self.adaptive_binning = False self.b_candidates = None self.min_marginal_gain_ratio = 0.02 if not hasattr(self, "use_hotpath"): self.use_hotpath = True if not hasattr(self, "augmented_pair_transforms"): self.augmented_pair_transforms = True if not hasattr(self, "topk_budget_strict"): self.topk_budget_strict = False if not hasattr(self, "dense_downstream_max_width"): self.dense_downstream_max_width = 200 if not hasattr(self, "execution_mode"): self.execution_mode = "audit" if not hasattr(self, "augmented_pair_max_features"): self.augmented_pair_max_features = 10 if not hasattr(self, "augmented_pair_transforms_"): self.augmented_pair_transforms_ = [] if not hasattr(self, "augmented_pair_selected_features_"): self.augmented_pair_selected_features_ = [] if not hasattr(self, "_original_feature_mask_downstream_"): self._original_feature_mask_downstream_ = None if not hasattr(self, "_original_selected_feature_names_downstream_"): self._original_selected_feature_names_downstream_ = None if not hasattr(self, "_strict_topk_applied_during_construction_"): self._strict_topk_applied_during_construction_ = False # v1.1.0 missing value handling — absent in models saved before this version if not hasattr(self, "_missing_col_edges_"): self._missing_col_edges_ = {} # v1.1.x integer-code adaptive path — absent in pre-v1.1.x models if not hasattr(self, "_adaptive_code_label_map_"): self._adaptive_code_label_map_ = {} # Rebuild the code→label map from stored bin edges whenever it's absent # or empty but adaptive bin edges are present. This handles save/load via # both pickle and the custom .hugiml format (serialization.py). if ( self.adaptive_binning and not self._adaptive_code_label_map_ and getattr(self, "_bin_edges_", None) ): self._rebuild_adaptive_code_label_map() # ────────────────────────────────────────────────────────────────── if hasattr(self, "td_") and self.td_ is not None: td = self.td_ self._native_available_ = not (hasattr(td, "_td") and td._td is None) else: self._native_available_ = False # ── Versioned save / load ─────────────────────────────────────────────────
[docs] def save_model(self, path: str | os.PathLike) -> None: """Persist the fitted model to a binary file with schema versioning. Parameters ---------- path : str or Path Raises ------ HUGIMLSerializationError """ _save_model(self, path)
[docs] @classmethod def load_model(cls, path: str | os.PathLike) -> HUGIMLClassifierNative: """Load a model previously saved with :meth:`save_model`. Parameters ---------- path : str or Path Returns ------- HUGIMLClassifierNative Raises ------ HUGIMLVersionError, HUGIMLSerializationError """ return _load_model(path, expected_type=cls) # type: ignore[no-any-return]
# ── Data preparation ──────────────────────────────────────────────────────
[docs] def prepareXy(self, X: pd.DataFrame, y: Any) -> tuple[pd.DataFrame, np.ndarray]: """Detect column types and encode the target variable. Call on the full dataset **before** any train/test split. Records which columns are integer, float, or categorical, and performs basic label validation. Parameters ---------- X : pd.DataFrame y : pd.Series or array-like Returns ------- X : pd.DataFrame (copy with string column names) y : np.ndarray of int64 """ if not isinstance(X, pd.DataFrame): raise HUGIMLParamError(f"X must be a pandas DataFrame, got {type(X).__name__}") X = X.copy() X.columns = [str(c) for c in X.columns] if len(set(X.columns)) < len(X.columns): dups = {c for c in X.columns if list(X.columns).count(c) > 1} warnings.warn( f"Duplicate column names detected: {dups}. Results may be unreliable.", HUGIMLWarning, stacklevel=2, ) catCols = [ c for idx, c in enumerate(X.columns) if pd.api.types.is_object_dtype(X.iloc[:, idx]) or pd.api.types.is_string_dtype(X.iloc[:, idx]) or isinstance(X.iloc[:, idx].dtype, pd.CategoricalDtype) ] intCols = [ c for idx, c in enumerate(X.columns) if pd.api.types.is_integer_dtype(X.iloc[:, idx]) ] for idx, c in enumerate(X.columns): if c not in catCols and X.iloc[:, idx].nunique() <= 1: warnings.warn( f"Column '{c}' is constant and will produce zero utility.", HUGIMLConvergenceWarning, stacklevel=2, ) X = X.reset_index(drop=True) self.feature_names_in_ = X.columns.tolist() self.cat_cols_mask_ = np.array([c in set(catCols) for c in X.columns], dtype=bool) self.is_int_mask_ = np.array([c in set(intCols) for c in X.columns], dtype=bool) y = np.asarray(y) try: y_float = y.astype(float) if np.isnan(y_float).any(): raise HUGIMLValidationError("y contains NaN values.") except (ValueError, TypeError) as e: if "NaN" in str(e): raise if np.issubdtype(y.dtype, np.floating): if np.allclose(y, y.astype(int)): y = y.astype(np.int64) else: raise HUGIMLValidationError( "y contains non-integer float values. HUG-IML requires integer class labels." ) return X, y
# ── Internal helpers ────────────────────────────────────────────────────── @staticmethod def _safe_cast_y(y: Any) -> np.ndarray: """Cast label array to int64, raising a clear error if y is non-finite. ``np.asarray(y, dtype=np.int64)`` silently produces undefined values (typically INT64_MIN = -9223372036854775808) when y contains NaN or infinity; the C++ mining kernel then processes garbage class codes. This helper converts via float first and raises ``HUGIMLValidationError`` if any non-finite values are present, which is the correct contract for a classifier that requires integer class labels. """ y_arr = np.asarray(y) if np.issubdtype(y_arr.dtype, np.floating) or y_arr.dtype == object: y_float = y_arr.astype(float, copy=False) if not np.all(np.isfinite(y_float)): raise HUGIMLValidationError( "y contains NaN or infinite values. HUG-IML requires integer class labels." ) return y_float.astype(np.int64) return y_arr.astype(np.int64) @staticmethod def _to_float_array(arr: Any, cat_mask: np.ndarray | None = None) -> tuple: """Split input into a float64 numeric array and raw categorical arrays. Adversarial-input hardening: - Forces writable copies of read-only column views. - Non-finite cells (NaN/Inf) in numerical columns are pre-converted to np.nan string-label bins by _prebin_nan_cols (fit) or _handle_test_nan (predict), so they arrive here as categorical. No median imputation is performed (removed in v1.1.0). """ is_df = isinstance(arr, pd.DataFrame) n = len(arr) if is_df: p = len(arr.columns) arr_np: np.ndarray | None = None else: arr_np = np.asarray(arr) p = arr_np.shape[1] if cat_mask is None: cat_mask = np.zeros(p, dtype=bool) # Hot predict path: all-numeric inputs do not need per-column pandas # Series extraction. Keep behaviour identical by still returning a # writable float64 copy and an all-None categorical list. if not np.any(cat_mask): try: if is_df: return arr.to_numpy(dtype=np.float64, copy=True), [None] * p assert arr_np is not None return np.array(arr_np, dtype=np.float64, copy=True), [None] * p except Exception: pass X_num = np.zeros((n, p), dtype=np.float64) X_cat_raw = [None] * p for j in range(p): if is_df: raw = arr.iloc[:, j] else: assert arr_np is not None # nosec B101 – guaranteed by control flow raw = arr_np[:, j] if cat_mask[j]: col_obj = np.asarray(raw, dtype=object).copy() for i, v in enumerate(col_obj): if v is None or (isinstance(v, float) and math.isnan(v)): col_obj[i] = np.nan X_cat_raw[j] = col_obj X_num[:, j] = 0.0 else: col = np.array(raw, dtype=np.float64, copy=True) # v1.1.0: non-finite cells (NaN/Inf) are pre-handled by # _prebin_nan_cols (fit) and _handle_test_nan (predict) # before reaching here. No median imputation. X_num[:, j] = col return X_num, X_cat_raw def _effective_topK_total(self, n_items: int | None = None) -> int: """Return the user-facing total topK pattern budget.""" if self.topK != -1: return int(self.topK) nitems = int(n_items) if n_items is not None else 100 nitems = max(nitems, 1) cap = 20000 if self.L == 1: return min(nitems, cap) if isinstance(self.L, int) and self.L >= 2: return min(math.comb(nitems, min(self.L, nitems)), cap) return min(sum(math.comb(nitems, r) for r in range(1, min(nitems, 6) + 1)), cap) def _effective_topK(self, n_items: int | None = None) -> int: """Return the effective topK budget used by the single-pass miner.""" return max(1, int(self._effective_topK_total(n_items))) def _effective_mining_topK(self, n_items: int | None = None) -> int: """Return the topK passed to the native miner. Mining uses exactly the requested topK budget. No expanded information-gain candidate pool and no round-wise budget splitting are used. """ return self._effective_topK(n_items) def _select_patterns_for_budget(self, patterns: list, n_items: int | None = None) -> list: """Return mined patterns without post-mining IG oversampling/filtering.""" return list(patterns) def _deduplicate_patterns_by_coverage( self, patterns: list, n_rows: int ) -> tuple[list, tuple | None]: """Remove duplicate HUG patterns with identical training-row coverage. This is intentionally a post-mining HUGIML optimization. Raw top-k mining output remains non-deduplicated and can be compared directly with THUI/HMiner/brute-force utility oracles. Among patterns that cover exactly the same set of training rows, the one with the highest information gain (and highest utility on ties) is retained; all lower-ranked duplicates are discarded. Scanning forward through the descending-IG-sorted list achieves this: the first time a coverage key is encountered it belongs to the best representative, so every subsequent occurrence of that key is marked for removal. fsK is intentionally not applied. COO cache: the matrix built here for coverage-key computation is a superset of the Stage-4 matrix (all raw patterns vs. survivors). We filter the COO in Python to keep only survivor columns and return the result alongside the pattern list, so Stage 4 can skip the second C++ build_train_matrix call entirely. Returns ------- survivors : list of PatternEntry cached_coo : (rows_array, cols_array) filtered to survivors, or None """ if not patterns: return [], None ordered = sorted(patterns, key=lambda pe: (-pe.ig, -pe.utility)) if hasattr(_core, "build_train_matrix_csr"): indptr_raw, indices_raw = _core.build_train_matrix_csr(self.td_, ordered) indptr_np = np.asarray(indptr_raw, dtype=np.int64) cols_np = np.asarray(indices_raw, dtype=np.int64) rows_np = np.repeat( np.arange(max(len(indptr_np) - 1, 0), dtype=np.int64), np.diff(indptr_np) ) else: rows_raw, cols_raw = _core.build_train_matrix(self.td_, ordered) rows_np = np.asarray(rows_raw, dtype=np.int64) cols_np = np.asarray(cols_raw, dtype=np.int64) # build coverage keys without Python-level int() conversions. # Sort COO by column index, then use searchsorted to split rows into # per-pattern arrays. Hash each array via .tobytes() rather than # materializing a Python tuple, which avoids O(n) object allocation # per pattern. n_ordered = len(ordered) seen: set[bytes] = set() remove: set[int] = set() if len(rows_np) > 0: order = np.argsort(cols_np, kind="stable") rows_sorted = rows_np[order].astype(np.int64) cols_sorted = cols_np[order] # split_points[i] = start index of column i in sorted arrays split_points = np.searchsorted(cols_sorted, np.arange(n_ordered)) split_points_end = np.append(split_points[1:], len(rows_sorted)) for i in range(n_ordered): seg = rows_sorted[split_points[i] : split_points_end[i]] # Sort within segment for a canonical key regardless of COO order key = np.sort(seg).tobytes() if key in seen: remove.add(i) else: seen.add(key) survivors = [pe for i, pe in enumerate(ordered) if i not in remove] # Build the column-index remap: old ordered-index → new survivor index. # Then filter the raw COO to keep only survivor entries, remapping cols. survivor_old_indices = [i for i in range(len(ordered)) if i not in remove] old_to_new = {old: new for new, old in enumerate(survivor_old_indices)} keep_mask = np.isin(cols_np, survivor_old_indices) rows_filtered = rows_np[keep_mask].astype(np.int32) cols_filtered = np.array([old_to_new[int(c)] for c in cols_np[keep_mask]], dtype=np.int32) return survivors, (rows_filtered, cols_filtered) def _make_estimator(self, n_cls: int) -> Any: if self.base_estimator is not None: return copy.deepcopy(self.base_estimator) solver = "liblinear" if n_cls == 2 else "lbfgs" return LogisticRegression(solver=solver, random_state=0, max_iter=500) def _validate_params(self) -> None: if not isinstance(self.B, int): raise HUGIMLParamError(f"B must be int, got {type(self.B).__name__}") if self.B != -1 and self.B < 2: raise HUGIMLParamError(f"B must be -1 (auto) or >= 2, got {self.B}") if not isinstance(self.L, int): raise HUGIMLParamError(f"L must be int, got {type(self.L).__name__}") if not isinstance(self.G, (float, int)): raise HUGIMLParamError(f"G must be numeric, got {type(self.G).__name__}") if self.G < 0: raise HUGIMLParamError(f"G must be >= 0, got {self.G}") dense_width = getattr(self, "dense_downstream_max_width", 200) if isinstance(dense_width, bool) or not isinstance(dense_width, int): raise HUGIMLParamError( f"dense_downstream_max_width must be an int >= 0, got {type(dense_width).__name__}" ) if int(dense_width) < 0: raise HUGIMLParamError(f"dense_downstream_max_width must be >= 0, got {dense_width}") if getattr(self, "execution_mode", "audit") not in {"audit", "production"}: raise HUGIMLParamError( "execution_mode must be either 'audit' or 'production'. " "Use 'audit' for complete traceability/governance artifacts, " "or 'production' to retain only prediction-critical state." ) if self.allCols is not None or self.origColumns is not None: if self.allCols is None or self.origColumns is None: raise HUGIMLParamError("allCols and origColumns must both be supplied together.") if not (isinstance(self.allCols, list) and len(self.allCols) == 3): raise HUGIMLParamError("allCols must be [int_cols, float_cols, cat_cols].") # ── v1.1.0 adaptive binning params ──────────────────────────────── if not isinstance(self.adaptive_binning, bool): raise HUGIMLParamError("adaptive_binning must be bool.") if self.b_candidates is not None: if ( not isinstance(self.b_candidates, list) or len(self.b_candidates) == 0 or not all(isinstance(b, int) and b >= 2 for b in self.b_candidates) ): raise HUGIMLParamError("b_candidates must be a non-empty list of int >= 2.") if not isinstance(self.min_marginal_gain_ratio, (float, int)): raise HUGIMLParamError("min_marginal_gain_ratio must be numeric.") if not 0 < float(self.min_marginal_gain_ratio) < 1: raise HUGIMLParamError( f"min_marginal_gain_ratio must be in (0, 1), got {self.min_marginal_gain_ratio}." ) allowed_feature_modes = { "patterns_only", "original_plus_patterns", "original_plus_interactions", } if self.feature_mode not in allowed_feature_modes: raise HUGIMLParamError( f"feature_mode must be one of {sorted(allowed_feature_modes)}, " f"got {self.feature_mode!r}." ) if not isinstance(self.augmented_pair_transforms, bool): raise HUGIMLParamError( "augmented_pair_transforms must be bool, " f"got {type(self.augmented_pair_transforms).__name__}." ) if not isinstance(self.topk_budget_strict, bool): raise HUGIMLParamError( f"topk_budget_strict must be bool, got {type(self.topk_budget_strict).__name__}." ) if not isinstance(self.augmented_pair_max_features, int): raise HUGIMLParamError( f"augmented_pair_max_features must be int, got {type(self.augmented_pair_max_features).__name__}." ) if self.augmented_pair_max_features < 2: raise HUGIMLParamError( f"augmented_pair_max_features must be >= 2, got {self.augmented_pair_max_features}." ) def _resolve_col_meta(self, X_train: Any) -> np.ndarray: """Determine column names and type masks from whichever setup path was used.""" if hasattr(self, "cat_cols_mask_"): return self.cat_cols_mask_ if self.allCols is not None and self.origColumns is not None: cat_set = set(self.allCols[2]) int_set = set(self.allCols[0]) col_list = list(self.origColumns) self.cat_cols_mask_ = np.array([c in cat_set for c in col_list], dtype=bool) self.is_int_mask_ = np.array([c in int_set for c in col_list], dtype=bool) self.feature_names_in_ = col_list return self.cat_cols_mask_ if isinstance(X_train, pd.DataFrame): col_list = X_train.columns.astype(str).tolist() self.cat_cols_mask_ = np.array( [ pd.api.types.is_object_dtype(X_train[c]) or pd.api.types.is_string_dtype(X_train[c]) or isinstance(X_train[c].dtype, pd.CategoricalDtype) for c in X_train.columns ], dtype=bool, ) self.is_int_mask_ = np.array( [pd.api.types.is_integer_dtype(X_train[c]) for c in X_train.columns], dtype=bool, ) self.feature_names_in_ = col_list return self.cat_cols_mask_ arr = np.asarray(X_train) if arr.ndim < 2: raise ValueError( f"HUGIMLClassifierNative expects a 2D array, got array of shape {arr.shape}." ) p = arr.shape[1] self.cat_cols_mask_ = np.zeros(p, dtype=bool) self.is_int_mask_ = np.zeros(p, dtype=bool) # Array inputs have no native column labels, but downstream components # (notably augmented-pair transforms) require stable feature names to # align IG scores, selected source columns, and transform-time matrices. # Use deterministic synthetic names instead of leaving feature_names_in_ # as None, which previously caused augmented pairs to be silently skipped # for ndarray inputs. self.feature_names_in_ = [f"col{j}" for j in range(p)] return self.cat_cols_mask_ @staticmethod def _timer() -> Any: """Return a lightweight timer object.""" class _T: def __init__(self) -> None: self.start = time.perf_counter() @property def ms(self) -> float: return (time.perf_counter() - self.start) * 1000 return _T() # ── Core fit ────────────────────────────────────────────────────────────── # ── v1.1.0 Adaptive binning methods ───────────────────────────────────── def _rebuild_adaptive_code_label_map(self) -> None: """Reconstruct ``_adaptive_code_label_map_`` from stored ``_bin_edges_``. Called automatically by ``__setstate__`` after deserialization (both pickle and the custom .hugiml format) when the map is absent or empty. The map is not stored explicitly in the .hugiml format; it is always derived from ``_bin_edges_``, which IS persisted. The reconstruction exactly mirrors the map built in ``_apply_adaptive_binning``: for each column whose edges are stored, and for each bin k, the key is the C++ item-map label ``"name=[k.000,(k+1).000]"`` and the value is the original-scale label ``"name=[edges[k]:.4g, edges[k+1]:.4g)"``. """ bin_edges = getattr(self, "_bin_edges_", {}) if not bin_edges: return # We only build entries for columns that were encoded as is_precoded # (i.e., had no NaN at training time). After deserialization we cannot # distinguish which columns were NaN-fallback vs is_precoded. Safe # approach: rebuild for all columns in _bin_edges_. The entry is only # consulted when the C++ item_map actually contains the key, so spurious # entries for NaN-fallback columns are harmless. new_map: dict[str, str] = {} for name, edges in bin_edges.items(): n_bins = len(edges) - 1 for k in range(n_bins): cpp_label = f"{name}=[{float(k):.3f},{float(k + 1):.3f}]" orig_label = f"{name}=[{edges[k]:.4g},{edges[k + 1]:.4g})" new_map[cpp_label] = orig_label self._adaptive_code_label_map_ = new_map self._adaptive_precoded_features_ = set(bin_edges) def _apply_adaptive_binning_cpp(self, X_train: Any, y_arr: np.ndarray) -> Any: """C++ replacement for _apply_adaptive_binning. Calls _core.select_adaptive_bins (C++ elbow_stop_nb_cpp) instead of the Python _adap_select_b loop. Produces identical _bin_edges_, per_feature_b_, ig_scores_, _adaptive_code_label_map_, cat_cols_mask_, and X_pre outputs. Falls back to the Python path on any error. """ try: return self._apply_adaptive_binning_cpp_impl(X_train, y_arr) except (MemoryError, HUGIMLMemoryError): # A native allocation failure means the Python fallback is very # likely to allocate even more memory. Surface a clean OOM instead # of cascading into an OS-level kill. raise except RuntimeError as exc: if "hugiml_timeout" in str(exc): raise logger.warning("C++ adaptive binning failed (%s); falling back to Python path.", exc) return self._apply_adaptive_binning(X_train, y_arr) except Exception as exc: logger.warning("C++ adaptive binning failed (%s); falling back to Python path.", exc) return self._apply_adaptive_binning(X_train, y_arr) def _apply_adaptive_binning_cpp_impl(self, X_train: Any, y_arr: np.ndarray) -> Any: """Implementation of the C++ adaptive B-selection path. Works for numeric-only and mixed (numeric + string categorical) DataFrames. Extracts only the numeric columns into a float64 array before calling select_adaptive_bins, avoiding the ValueError that X_df.to_numpy(float64) raises when string-categorical columns are present. """ is_df = isinstance(X_train, pd.DataFrame) X_df = ( X_train if is_df else pd.DataFrame( X_train, columns=( list(getattr(self, "feature_names_in_", []) or []) if getattr(self, "feature_names_in_", None) is not None and len(getattr(self, "feature_names_in_", [])) == np.asarray(X_train).shape[1] else None ), ) ) candidates = sorted(set(self.b_candidates or [2, 3, 5, 7, 10, 15])) ratio = self.min_marginal_gain_ratio cat_mask = self.cat_cols_mask_ col_names = list(X_df.columns) n_cls = len(np.unique(y_arr)) self._bin_edges_: dict = {} self.per_feature_b_: dict = {} self.ig_scores_: dict = {} # ── Extract only the numeric (non-cat) columns ──────────────────────── # Passing the full mixed DataFrame to to_numpy(float64) raises ValueError # when string-categorical columns are present. select_adaptive_bins # already skips is_cat columns, so we can pass only the numeric subset # and map indices back afterwards via num_col_map. num_col_map = [j for j, v in enumerate(cat_mask) if not v] col_names_num = [col_names[j] for j in num_col_map] # C++ binding expects std::string column names. ndarray inputs create # integer DataFrame column names, so pass string labels to C++ while # retaining the original names for pandas indexing below. col_names_num_cpp = [str(name) for name in col_names_num] if not num_col_map: # All columns are categorical — nothing to adapt self._adaptive_code_label_map_: dict[str, str] = {} return X_train # Build X_num: shape (n, len(num_col_map)), float64 X_num = X_df.iloc[:, num_col_map].to_numpy(dtype=np.float64, na_value=np.nan) y_int = y_arr.astype(np.int64) # Pass all-zeros is_cat so C++ processes every column in X_num is_cat_zeros = np.zeros(len(num_col_map), dtype=np.uint8) adap_result = _core.select_adaptive_bins( X_num, y_int, n_cls, col_names_num_cpp, is_cat_zeros, candidates, ratio, ) # Pack C++ results into Python model attributes. # adap_result.num_col_indices[ci] is the index within X_num (= within # col_names_num). Map back to the original X_df column index via num_col_map. for ci, col_res in enumerate(adap_result.cols): j_num = adap_result.num_col_indices[ci] name = col_names_num[j_num] edges = np.array(col_res.edges) self._bin_edges_[name] = edges # Match the Python adaptive path, which records the effective number # of stored bins after duplicate quantile edges have collapsed. self.per_feature_b_[name] = len(edges) - 1 # Pad missing candidates (early elbow-stop) with 0.0 for diagnostics scores: dict[int, float] = {} for k, b in enumerate(candidates): scores[b] = col_res.ig_scores[k] if k < len(col_res.ig_scores) else 0.0 self.ig_scores_[name] = scores # Build _adaptive_code_label_map_ and update column-type masks self._adaptive_code_label_map_: dict[str, str] = {} self._adaptive_precoded_features_ = set(self._bin_edges_) new_cat = cat_mask.copy() new_int = getattr(self, "is_int_mask_", np.zeros(len(col_names), dtype=bool)).copy() for name, edges in self._bin_edges_.items(): if name not in X_df.columns: continue j = col_names.index(name) if name in col_names else -1 if j >= 0: new_cat[j] = False new_int[j] = True n_bins = len(edges) - 1 for k in range(n_bins): cpp_label = f"{name}=[{float(k):.3f},{float(k + 1):.3f}]" orig_label = f"{name}=[{edges[k]:.4g},{edges[k + 1]:.4g})" self._adaptive_code_label_map_[cpp_label] = orig_label self.cat_cols_mask_ = new_cat self.is_int_mask_ = new_int # Apply integer codes to the pre-binned numeric columns in X_pre. # Native storage is int32 with -1 as the missing sentinel; fetch one # column at a time to avoid materialising a second full float64 code # matrix. Cast only the one column that pandas needs so NaN can be # represented for the legacy pre-binned path. X_pre = X_df.copy() for ci in range(adap_result.n_num_cols): j_num = adap_result.num_col_indices[ci] name = col_names_num[j_num] col_raw = pd.to_numeric(X_df[name], errors="coerce").values if hasattr(adap_result, "get_X_codes_col"): codes_i32 = np.asarray(adap_result.get_X_codes_col(ci), dtype=np.int32) missing_codes = codes_i32 < 0 else: # compatibility with older native wheels # Older native wheels expose get_X_codes() as float64 with # np.nan as the missing sentinel. Casting that matrix directly # to int32 can platform-dependently produce either INT32_MIN or # 0; the latter silently aliases a valid bin. Detect missing # sentinels before the integer cast. codes_raw = np.asarray(adap_result.get_X_codes()[:, ci]) missing_codes = ~np.isfinite(codes_raw) codes_i32 = np.zeros(codes_raw.shape, dtype=np.int32) finite_codes = ~missing_codes if np.any(finite_codes): codes_i32[finite_codes] = codes_raw[finite_codes].astype(np.int32) codes = codes_i32.astype(np.float32, copy=False) codes[missing_codes | (codes_i32 < 0) | (~np.isfinite(col_raw))] = np.nan X_pre[name] = codes return X_pre if is_df else X_pre def _apply_adaptive_binning(self, X_train: Any, y_arr: np.ndarray) -> Any: """Pre-discretise numerical features using per-feature IG-selected B_j. Called by _fit_impl when adaptive_binning=True. The method: 1. Iterates over numerical columns, runs _adap_select_b to choose B_j. 2. Computes quantile edges on the training column and stores them in _bin_edges_ so they can be reapplied at predict time. 3. Replaces each numerical column with string bin labels ([lo,hi)). 4. Updates cat_cols_mask_ and is_int_mask_ to mark pre-binned features as categorical, so Stage 1 of _fit_impl routes them through the C++ categorical path. Returns the pre-binned DataFrame (or the input unchanged if no numerical features are found). """ is_df = isinstance(X_train, pd.DataFrame) X_df = ( X_train if is_df else pd.DataFrame( X_train, columns=( list(getattr(self, "feature_names_in_", []) or []) if getattr(self, "feature_names_in_", None) is not None and len(getattr(self, "feature_names_in_", [])) == np.asarray(X_train).shape[1] else None ), ) ) candidates = sorted(set(self.b_candidates or [2, 3, 5, 7, 10, 15])) ratio = self.min_marginal_gain_ratio cat_mask = self.cat_cols_mask_ col_names = list(X_df.columns) self._bin_edges_: dict = {} self.per_feature_b_: dict = {} self.ig_scores_: dict = {} pre_binned: set = set() # Identify which columns need adaptive B-selection. num_cols = [ (j, name) for j, name in enumerate(col_names) if not (j < len(cat_mask) and cat_mask[j]) ] # parallel B-selection: def _select_one(j: int, name: str) -> tuple: col = pd.to_numeric(X_df.iloc[:, j], errors="coerce").values finite_mask = np.isfinite(col) if finite_mask.sum() < 10: chosen = candidates[len(candidates) // 2] scores = {b: 0.0 for b in candidates} else: chosen, scores = _adap_select_b( col[finite_mask], y_arr[finite_mask], candidates, ratio ) edges = _adap_quantile_edges(col, chosen) return name, chosen, scores, edges try: from joblib import Parallel from joblib import delayed as _delayed _n_jobs = self.n_jobs if hasattr(self, "n_jobs") else 1 _results = Parallel(n_jobs=_n_jobs, prefer="threads")( _delayed(_select_one)(j, name) for j, name in num_cols ) except Exception: _results = [_select_one(j, name) for j, name in num_cols] for name, chosen, scores, edges in _results: self.ig_scores_[name] = scores self.per_feature_b_[name] = len(edges) - 1 self._bin_edges_[name] = edges pre_binned.add(name) # ── Integer-code path ───────────────────────────────────── # Encode pre-binned columns as integer codes (0..B_j-1) and route them # through the C++ is_int path, which is 8–20x faster than the old # string-categorical std::string path. # # NaN handling (hybrid): # Columns WITHOUT any NaN/Inf → is_int=True (fast integer path). # Columns WITH NaN/Inf → cat=True (existing string path, # correct because C++ categorical skips np.nan cells). # In practice most adaptive-mode datasets have no NaN; the string # fallback only fires when needed. # # Label translation: the C++ integer path stores item labels as # "feat=[k,k+1]" (integer-range format). get_hug_features() remaps # these to original-scale "[lo,hi)" labels via _adaptive_code_label_map_. self._adaptive_code_label_map_: dict[str, str] = {} self._adaptive_precoded_features_ = set(self._bin_edges_) new_cat = cat_mask.copy() new_int = getattr(self, "is_int_mask_", np.zeros(len(col_names), dtype=bool)).copy() X_pre = X_df.copy() for name, edges in self._bin_edges_.items(): if name not in X_df.columns: continue col = pd.to_numeric(X_df[name], errors="coerce").values n_bins = len(edges) - 1 has_nan = not np.isfinite(col).all() j = col_names.index(name) if name in col_names else -1 # Integer-code path for all adaptive columns, with or without NaN. # NaN/Inf cells are encoded as np.nan (float64), which the C++ # is_precoded handler reads as non-finite and maps to -1 (skipped # — no item generated for that row/feature pair). # This keeps the column numeric rather than converting the whole # column to object/string dtype for a small number of NaNs, which # would force the slower C++ categorical path. codes = np.clip(np.digitize(col, edges[1:-1]), 0, n_bins - 1).astype(np.float64) if has_nan: codes[~np.isfinite(col)] = np.nan # sentinel: C++ skips X_pre[name] = codes if j >= 0: new_cat[j] = False new_int[j] = True # Build C++ label -> original-scale label translation. # Key format matches the C++ is_precoded label exactly: # std::fixed << std::setprecision(3) -> "name=[0.000,1.000]" for k in range(n_bins): cpp_label = f"{name}=[{float(k):.3f},{float(k + 1):.3f}]" orig_label = f"{name}=[{edges[k]:.4g},{edges[k + 1]:.4g})" self._adaptive_code_label_map_[cpp_label] = orig_label self.cat_cols_mask_ = new_cat self.is_int_mask_ = new_int return X_pre if is_df else X_pre def _prebin_for_predict(self, X: Any) -> Any: """Apply stored adaptive bin edges before C++ inference. The common ndarray path is kept entirely in NumPy to avoid constructing and copying a pandas DataFrame for every predict()/transform() call. DataFrame input still preserves labels and mixed categorical columns. """ bin_edges = getattr(self, "_bin_edges_", {}) if not bin_edges: return X feat_names = getattr(self, "feature_names_in_", None) code_label_map = getattr(self, "_adaptive_code_label_map_", {}) precoded_features = getattr(self, "_adaptive_precoded_features_", None) if precoded_features is None: # Backward-compatible fallback for models saved before this attribute. precoded_features = set(bin_edges) if code_label_map else set() # Fast path: numeric ndarray input. Adaptive fused L1 stores ndarray # feature names as col0, col1, ...; keep the output as ndarray so # _to_float_array can consume it without pandas overhead. if not isinstance(X, pd.DataFrame): arr = np.asarray(X) if arr.ndim == 1: arr = arr.reshape(1, -1) names = ( list(feat_names) if feat_names is not None and len(feat_names) == arr.shape[1] else [f"col{j}" for j in range(arr.shape[1])] ) name_to_idx = {name: j for j, name in enumerate(names)} # If a name mismatch occurs, fall back to the labelled path rather # than silently applying edges to the wrong column. if all(name in name_to_idx for name in bin_edges): X_out = np.array(arr, dtype=np.float64, copy=True) for name, edges in bin_edges.items(): j = name_to_idx[name] n_bins = len(edges) - 1 col = X_out[:, j] if name in precoded_features: codes = np.clip(np.digitize(col, edges[1:-1]), 0, n_bins - 1).astype( np.float64 ) nan_mask = ~np.isfinite(col) if nan_mask.any(): codes[nan_mask] = np.nan X_out[:, j] = codes else: # Legacy string categorical fallback requires labels. # It is rare in current adaptive models, but preserve # correctness by using the DataFrame path below. break else: return X_out is_df = isinstance(X, pd.DataFrame) if is_df: X_df = X else: arr = np.asarray(X) if arr.ndim == 1: arr = arr.reshape(1, -1) cols = ( list(feat_names) if feat_names is not None and len(feat_names) == arr.shape[1] else [f"col{j}" for j in range(arr.shape[1])] ) X_df = pd.DataFrame(arr, columns=cols) # Fast labelled path for the common adaptive case: all stored adaptive # columns are numeric pre-coded features. Convert once to NumPy, edit # columns in-place, and rebuild one DataFrame instead of assigning one # pandas Series per feature. if is_df and all(name in X_df.columns and name in precoded_features for name in bin_edges): try: cols = list(X_df.columns) name_to_idx = {str(c): j for j, c in enumerate(cols)} X_mat = X_df.to_numpy(dtype=np.float64, copy=True) for name, edges in bin_edges.items(): j = name_to_idx[name] n_bins = len(edges) - 1 col = X_mat[:, j] codes = np.clip(np.digitize(col, edges[1:-1]), 0, n_bins - 1).astype(np.float64) nan_mask = ~np.isfinite(col) if nan_mask.any(): codes[nan_mask] = np.nan X_mat[:, j] = codes return pd.DataFrame(X_mat, columns=X_df.columns, index=X_df.index) except Exception: pass X_out = X_df.copy() for name, edges in bin_edges.items(): if name not in X_df.columns: continue col = pd.to_numeric(X_df[name], errors="coerce").values n_bins = len(edges) - 1 if name in precoded_features: codes = np.clip(np.digitize(col, edges[1:-1]), 0, n_bins - 1).astype(np.float64) nan_mask = ~np.isfinite(col) if nan_mask.any(): codes[nan_mask] = np.nan X_out[name] = codes else: X_out[name] = _adap_apply_edges(col, edges) return X_out if is_df else X_out # ── v1.1.0 Missing value handling methods ────────────────────────────────── # # NaN (and Inf) in a numerical feature is treated as "not observed" — # no item is generated in the transaction for that (row, feature) pair. # This matches the categorical path, where np.nan in X_cat_raw is already # silently skipped by the C++ transaction builder. # # Numerical columns that contain non-finite values in training are # pre-binned to string labels (same mechanism as adaptive_binning) so the # C++ sees them as categorical. Non-finite cells become np.nan in the # label array → C++ skips → no item. Edges are stored in # _missing_col_edges_ and reused at predict time. # # At predict time, columns that were non-finite-free in training but # receive non-finite test values are handled dynamically using the C++ # edge arrays stored in td_._cpp_all_edges[j]. # # The old median imputation in _to_float_array is removed entirely. # ──────────────────────────────────────────────────────────────────────────── def _prebin_nan_cols(self, X_train: Any) -> Any: """Pre-bin fixed-B numeric columns that must follow the string path. Contract for every L value: - Finite numeric columns stay numeric and use the native fixed-B numeric path, including L > 1. - Numeric columns with NaN/Inf during training are pre-binned to the string/categorical path so the fitted transaction data and later predictions share the same missing-value representation. A new NaN/Inf at prediction time in a column that was clean during training is handled by the native numeric transaction builder, which skips item generation for that cell. This fit-only helper resets ``_missing_col_edges_`` at the beginning of each call; callers outside the normal fit path should not invoke it incrementally. For ndarray inputs, the returned object preserves ndarray type when a conversion is needed, avoiding accidental DataFrame type coercion. """ is_df = isinstance(X_train, pd.DataFrame) if is_df: X_df = X_train else: arr = np.asarray(X_train) if arr.ndim == 1: arr = arr.reshape(1, -1) feat_names = list(getattr(self, "feature_names_in_", []) or []) cols = ( feat_names if len(feat_names) == arr.shape[1] else [f"col{j}" for j in range(arr.shape[1])] ) X_df = pd.DataFrame(arr, columns=cols) cat_mask = self.cat_cols_mask_ col_names = list(X_df.columns) n_cols = len(col_names) self._missing_col_edges_: dict = {} new_cat = cat_mask.copy() new_int = getattr(self, "is_int_mask_", np.zeros(n_cols, dtype=bool)).copy() modified = False for j, name in enumerate(col_names): if j >= len(cat_mask) or cat_mask[j]: continue # already categorical — C++ handles np.nan natively col = pd.to_numeric(X_df.iloc[:, j], errors="coerce").values finite_mask = np.isfinite(col) if bool(np.all(finite_mask)): # Consistent fast path for every L: finite numeric columns # remain numeric and are binned by native fixed-B code. continue finite = col[finite_mask] edges = ( _adap_quantile_edges(finite, self.B) if finite.size > 0 else np.array([0.0, 1.0]) ) self._missing_col_edges_[name] = edges new_cat[j] = True new_int[j] = False modified = True if not modified: return X_train self.cat_cols_mask_ = new_cat self.is_int_mask_ = new_int X_pre = X_df.copy() for name, edges in self._missing_col_edges_.items(): col = pd.to_numeric(X_df[name], errors="coerce").values X_pre[name] = _adap_apply_edges(col, edges) return X_pre if is_df else X_pre.to_numpy() def _handle_test_nan(self, X_test: Any) -> tuple: """Apply training-time missing-column bin edges at test time. In fixed-B non-adaptive models, this converts only the columns recorded in ``_missing_col_edges_`` back to the exact string-label representation used at fit time. The rule is identical for L == 1 and L > 1: only numeric columns that had NaN/Inf during training are recorded here. Numeric columns not recorded in ``_missing_col_edges_`` remain numeric and are binned by C++ directly; if they contain new non-finite values at prediction time, the native numeric transaction builder skips them. Returns ``(X_modified, local_cat_mask)``. ``self.cat_cols_mask_`` is never mutated; ``local_cat_mask`` is a per-call copy. """ base_cat = getattr(self, "cat_cols_mask_", None) if base_cat is None: return X_test, base_cat is_df = isinstance(X_test, pd.DataFrame) feat_names = getattr(self, "feature_names_in_", None) if is_df: X_df = X_test else: arr = np.asarray(X_test) if arr.ndim == 1: arr = arr.reshape(1, -1) cols = ( list(feat_names) if feat_names is not None and len(feat_names) == arr.shape[1] else [f"col{j}" for j in range(arr.shape[1])] ) X_df = pd.DataFrame(arr, columns=cols) col_names = list(X_df.columns) missing_edges = getattr(self, "_missing_col_edges_", {}) local_cat = base_cat.copy() modified = False for j, name in enumerate(col_names): if j >= len(base_cat): continue # Column was pre-binned at training time because it contained # NaN/Inf during fit: convert raw float values to the same string # bin labels so C++ item lookups match. if name in missing_edges: col = pd.to_numeric(X_df.iloc[:, j], errors="coerce").values if not modified: X_df = X_df.copy() modified = True X_df[name] = _adap_apply_edges(col, missing_edges[name]) local_cat[j] = True continue # Genuinely categorical column or column not in model: # C++ already handles np.nan natively — no action needed. if not modified: return X_test, base_cat return (X_df if is_df else X_df.to_numpy()), local_cat # ── End v1.1.0 missing value handling methods ───────────────────────────── # ── End v1.1.0 adaptive binning methods ────────────────────────────────── def _require_core(self) -> None: """Raise a clear ImportError if the native extension is absent.""" if not _CORE_AVAILABLE: raise ImportError( "HUGIMLClassifierNative requires the compiled C++ extension " "'_hugiml_core'.\n" "Build it with: pip install . --no-build-isolation\n" "Or for development: HUGIML_FAST_BUILD=1 python setup.py " "build_ext --inplace" )
[docs] def fit(self, X: Any, y: Any) -> HUGIMLClassifierNative: """Fit the HUG-IML model on training data. Parameters ---------- X : pd.DataFrame or ndarray, shape (n_samples, n_features) y : array-like of int, shape (n_samples,) Returns ------- self Thread safety ------------- fit() acquires an exclusive lock. Concurrent fit() calls on the same instance are serialized. predict/predict_proba/transform are read-only on fitted state and safe for concurrent use after fit() returns. """ self._require_core() with self._fit_lock: try: return self._fit_impl(X, y) except MemoryError as exc: raise HUGIMLMemoryError( "HUGIML fit failed cleanly because required memory could not be allocated. " "Reduce n/p/topK/B, keep adaptive_binning=True/use_hotpath=True, or increase " "the process memory limit. Original error: " + str(exc) ) from exc except RuntimeError as exc: if "hugiml_timeout" in str(exc): raise HUGIMLTimeoutError(str(exc)) from exc raise
def _fit_impl(self, X_train: Any, y_train: Any) -> HUGIMLClassifierNative: # Clear all fitted state so that re-fitting the same instance is # idempotent. Without this, _resolve_col_meta() short-circuits on # the cached cat_cols_mask_ from the previous fit, causing the column # type masks to carry over and producing non-reproducible results. for _attr in ( "cat_cols_mask_", "is_int_mask_", "feature_names_in_", "_bin_edges_", "_missing_col_edges_", "_adaptive_code_label_map_", "per_feature_b_", "ig_scores_", "patterns_", "model_", "classes_", "x_train_hup_", "fit_metadata_", "_original_scaler_", "_original_numeric_medians_", "_original_numeric_medians_array_", "_original_feature_names_downstream_", "_pattern_orders_", "_interaction_pattern_mask_", "x_train_downstream_", "_augmented_pair_block_", "augmented_pair_transforms_", "augmented_pair_selected_features_", "_native_original_feature_names_downstream_", "_native_original_feature_scores_downstream_", "_strict_topk_applied_during_construction_", "_strict_topk_feature_mask_", "_strict_topk_feature_scores_", "_strict_topk_selected_feature_names_", "_downstream_feature_names_full_", "_training_pattern_matrix_shape_", "_training_pattern_matrix_nnz_", "_training_downstream_matrix_shape_", "_training_downstream_matrix_nnz_", "_drift_det", ): self.__dict__.pop(_attr, None) t_total = self._timer() stage_times: dict[str, float] = {} # Reject sparse matrices with an informative message from scipy.sparse import issparse as _issparse if _issparse(X_train): raise ValueError( "HUGIMLClassifierNative does not support sparse input. " "Convert to a dense array via X.toarray() first." ) # Reject complex-valued arrays if hasattr(X_train, "dtype") and np.iscomplexobj(X_train): raise ValueError("Complex data not supported by HUGIMLClassifierNative.") self._validate_params() # Configure OpenMP before adaptive binning. Adaptive B-selection is # column-parallel in the native path, so applying n_jobs after adaptive # preprocessing is too late. n_threads = _core.openmp_get_max_threads() if self.n_jobs == -1 else self.n_jobs if n_threads > 0: _core.openmp_set_num_threads(n_threads) actual_threads = _core.openmp_get_max_threads() # Preserve raw input if the downstream mode needs original features, # or if L>1 will create internal augmented_pair_transforms. This remains # an internal operation; no public hyperparameter is added. _needs_augmented_pairs = bool(self.L > 1 and bool(self.augmented_pair_transforms)) X_train_original_for_downstream = ( self._copy_input_for_downstream(X_train) if (self.feature_mode != "patterns_only" or _needs_augmented_pairs) else None ) # Fused adaptive+L1 hot path can consume raw X directly and must not # materialise the intermediate X_codes matrix/DataFrame. _use_fused_adaptive_l1 = ( self.adaptive_binning and self.use_hotpath and _CORE_AVAILABLE and self.L == 1 and hasattr(_core, "prepare_and_mine_l1_adaptive") ) # ── v1.2.0 adaptive pre-binning (C++ hot path or Python fallback) ───── # ── v1.2.0 adaptive B-selection always uses C++ ────────────────── # _apply_adaptive_binning_cpp calls _core.select_adaptive_bins # (elbow_stop_nb_cpp) whenever the C++ extension is available. # use_hotpath does NOT gate this: C++ adaptive selection is always # preferred because it produces identical outputs with no conflicts. # Python _apply_adaptive_binning is kept as a fallback for # environments where the C++ extension is absent. if self.adaptive_binning and not _use_fused_adaptive_l1: self._resolve_col_meta(X_train) # prime cat_cols_mask_ first _y_for_ig = self._safe_cast_y(y_train) if _CORE_AVAILABLE and hasattr(_core, "select_adaptive_bins"): X_train = self._apply_adaptive_binning_cpp(X_train, _y_for_ig) else: X_train = self._apply_adaptive_binning(X_train, _y_for_ig) if self.verbose: logger.info( " adaptive binning: %d features pre-binned, B_j in [%d, %d]", len(self._bin_edges_), min(self.per_feature_b_.values(), default=0), max(self.per_feature_b_.values(), default=0), ) # ───────────────────────────────────────────────────────────────── # ── Fixed-B non-finite handling (non-adaptive path) ─────────────── # Use one consistent scheme for every L: numeric columns stay numeric # unless they contain NaN/Inf during training. Columns with training # non-finite cells are pre-binned to the string/categorical path so # fit and predict use the same missing-value representation. Clean # numeric columns, including L>1 columns, use the native numeric path; # new test-time NaN/Inf values are skipped by native item generation. if not self.adaptive_binning: self._resolve_col_meta(X_train) X_train = self._prebin_nan_cols(X_train) # ───────────────────────────────────────────────────────────────── mem = _MemoryTracker() with mem: # Stage 1: resolve column metadata t = self._timer() cat_mask = self._resolve_col_meta(X_train) int_mask = getattr(self, "is_int_mask_", None) X_num, X_cat_raw = self._to_float_array(X_train, cat_mask) y_train = self._safe_cast_y(y_train) # Native numeric paths treat non-finite feature cells as missing # observations and skip item generation. Let NaN/Inf through # sklearn validation; y is already checked separately by _safe_cast_y. try: X_num, y_train = check_X_y(X_num, y_train, dtype=None, ensure_all_finite=False) except TypeError: X_num, y_train = check_X_y(X_num, y_train, dtype=None, force_all_finite=False) self.n_features_in_ = X_num.shape[1] self.classes_ = np.unique(y_train) n_cls = len(self.classes_) stage_times["resolve_meta"] = t.ms if n_cls < 2: raise HUGIMLValidationError( f"y contains only {n_cls} class(es). At least 2 are required." ) if X_num.shape[0] < n_cls: raise HUGIMLValidationError( f"Fewer samples ({X_num.shape[0]}) than classes ({n_cls})." ) est_mb = _MemoryTracker.estimate_fit_mb( X_num.shape[0], X_num.shape[1], X_num.shape[1] * 10, self._effective_topK() ) if est_mb > 4000: warnings.warn( f"Estimated peak memory ~{est_mb:.0f} MB. " "Consider reducing topK or dataset size.", HUGIMLWarning, stacklevel=4, ) if self.verbose: logger.info( "HUGIMLClassifierNative.fit — %dx%d, %d classes", X_num.shape[0], X_num.shape[1], n_cls, ) # Stage 2+3+4: prepare / mine / build matrix # ── v1.2.0 fused L=1 hot path ────────────────────────────────── # When use_hotpath=True and L=1: a single C++ call replaces # prepare_transactions + mine_patterns + build_train_matrix. # No TransList, no hash-map lookups, direct COO from TID index. # Falls back to the original three-step path for L>1, adaptive # binning with Python fallback, or when explicitly disabled. t = self._timer() rss_before = _get_peak_rss_kb() col_names = getattr(self, "feature_names_in_", None) is_cat_np = cat_mask.astype(np.uint8) is_int_np = ( int_mask if int_mask is not None else np.zeros(X_num.shape[1], dtype=bool) ).astype(np.uint8) # Build is_precoded mask without scanning every adaptive label key. is_precoded_np: np.ndarray | None = None if self.adaptive_binning: precoded_features = getattr(self, "_adaptive_precoded_features_", set()) p_cols = X_num.shape[1] feat_names_list = ( col_names if col_names is not None else [f"col{j}" for j in range(p_cols)] ) is_precoded_np = np.fromiter( (name in precoded_features for name in feat_names_list), dtype=np.uint8, count=p_cols, ) _use_fused = ( self.use_hotpath and _CORE_AVAILABLE and self.L == 1 and hasattr(_core, "prepare_and_mine_l1") ) if _use_fused: # ── Fused path ──────────────────────────────────────────────── fit_deadline = ( time.perf_counter() + self.max_fit_seconds if self.max_fit_seconds else None ) remaining_s = max(fit_deadline - time.perf_counter(), 0.0) if fit_deadline else 0.0 K_eff = self._effective_mining_topK() # rough pre-estimate (no n_items yet) if ( os.environ.get("HUGIML_DISABLE_FIXED_NUMERIC_L1_FASTPATH", "0") != "1" and (not self.adaptive_binning) and hasattr(_core, "prepare_and_mine_l1_fixed_numeric") and not bool(np.any(is_cat_np)) ): _l1_result = _core.prepare_and_mine_l1_fixed_numeric( X_num, y_train, self.B, col_names, is_int_np, K_eff, self.G, remaining_s, compute_original_scores=(self.feature_mode != "patterns_only"), ) elif self.adaptive_binning and hasattr(_core, "prepare_and_mine_l1_adaptive"): candidates = sorted(set(self.b_candidates or [2, 3, 5, 7, 10, 15])) _l1_result = _core.prepare_and_mine_l1_adaptive( X_num, y_train, col_names, is_cat_np, is_int_np, X_cat_raw if any(v is not None for v in X_cat_raw) else None, candidates, self.min_marginal_gain_ratio, K_eff, self.G, remaining_s, compute_original_scores=(self.feature_mode != "patterns_only"), ) # Install adaptive metadata for predict()/transform() so test # data is pre-binned to the same integer-code representation # used by the fitted td. feat_names_list = ( list(col_names) if col_names is not None else [f"col{j}" for j in range(X_num.shape[1])] ) self._bin_edges_ = {} self.per_feature_b_ = {} self.ig_scores_ = {} self._adaptive_code_label_map_ = {} self._adaptive_precoded_features_ = set() new_cat = cat_mask.copy() new_int = ( int_mask.copy() if int_mask is not None else np.zeros(X_num.shape[1], dtype=bool) ) for ci, col_res in enumerate(getattr(_l1_result, "adaptive_cols", [])): j = int(_l1_result.adaptive_num_col_indices[ci]) name = feat_names_list[j] edges = np.array(col_res.edges) self._bin_edges_[name] = edges self._adaptive_precoded_features_.add(name) self.per_feature_b_[name] = len(edges) - 1 scores: dict[int, float] = {} for k, b in enumerate(candidates): scores[b] = col_res.ig_scores[k] if k < len(col_res.ig_scores) else 0.0 self.ig_scores_[name] = scores new_cat[j] = False new_int[j] = True for k in range(len(edges) - 1): cpp_label = f"{name}=[{float(k):.3f},{float(k + 1):.3f}]" orig_label = f"{name}=[{edges[k]:.4g},{edges[k + 1]:.4g})" self._adaptive_code_label_map_[cpp_label] = orig_label self.cat_cols_mask_ = new_cat self.is_int_mask_ = new_int else: _l1_result = _core.prepare_and_mine_l1( X_num, y_train, 2 if self.adaptive_binning else self.B, col_names, is_cat_np, is_int_np, X_cat_raw if any(v is not None for v in X_cat_raw) else None, is_precoded_np, K_eff, self.G, remaining_s, compute_original_scores=(self.feature_mode != "patterns_only"), ) self.td_ = _l1_result.td if self.feature_mode != "patterns_only": native_orig_names = [ f"orig:{name}" for name in list(getattr(_l1_result, "original_feature_names", []) or []) ] native_orig_scores = np.asarray( list(getattr(_l1_result, "original_feature_scores", []) or []), dtype=np.float64, ) if native_orig_names and len(native_orig_names) == len(native_orig_scores): self._native_original_feature_names_downstream_ = native_orig_names self._native_original_feature_scores_downstream_ = native_orig_scores cpp_mem_bytes = self.td_.memory_usage_bytes() n_items = len(self.td_.item_twu) K = self._effective_topK(n_items) stage_times["prepare_transactions"] = t.ms if self.verbose: logger.info(" items=%d, K=%d [fused L=1 path]", n_items, K) t = self._timer() raw_patterns_list = list(_l1_result.patterns) self.raw_patterns_ = sorted( raw_patterns_list, key=lambda pe: (-pe.utility, tuple(pe.items)) ) # L=1 singletons are unique by definition (each pattern is one # distinct item), so deduplication by coverage is a no-op. # Skip calling _deduplicate_patterns_by_coverage (which would # call build_train_matrix on the empty td.transactions) and use # the COO returned directly by the fused path. self.patterns_ = self.raw_patterns_ stage_times["mine_patterns"] = t.ms if len(self.patterns_) == 0: if self.feature_mode == "patterns_only": raise HUGIMLMiningError( "No HUG patterns found. Try reducing G, increasing topK, or adjusting B / L." ) # In original_plus_patterns mode, zero mined HUG patterns is # not fatal: downstream fitting should fall back to the # original feature block with an empty pattern matrix. n_train = len(y_train) self.x_train_hup_ = csr_matrix((n_train, 0), dtype=np.float32) stage_times["build_matrix"] = 0.0 if self.verbose: logger.info( " %d patterns in %.0f ms [fused]", len(self.patterns_), stage_times["mine_patterns"], ) t = self._timer() n_train = len(y_train) n_pats = len(self.patterns_) if n_pats == 0: # original_plus_* modes are allowed to continue with an # empty pattern block; the downstream matrix will contain # the original feature block and any enabled augmented block. # Do not call native build/get_coo paths with an empty # pattern list because native code rejects that as # "patterns list is empty — nothing to build". self.x_train_hup_ = csr_matrix((n_train, 0), dtype=np.float32) else: # Build train matrix from fused native CSR when available. This # avoids copying COO rows/cols into Python and lets scipy consume the # compact CSR structure directly. get_coo remains as a compatibility # fallback for older native wheels. if hasattr(_l1_result, "get_csr"): indptr, indices = _l1_result.get_csr(n_train, n_pats) data = np.ones(len(indices), dtype=np.float32) self.x_train_hup_ = csr_matrix( (data, indices, indptr), shape=(n_train, n_pats), dtype=np.float32 ) else: rows, cols = _l1_result.get_coo() # The fused COO is ordered by pattern index matching raw_patterns_ # (both sorted by descending utility). If patterns_ was reordered # by dedup we would need to remap cols — but dedup is skipped here # so the order is identical. data = np.ones(len(rows), dtype=np.float32) self.x_train_hup_ = csr_matrix( (data, (rows, cols)), shape=(n_train, n_pats), dtype=np.float32 ) stage_times["build_matrix"] = t.ms else: # ── Original three-step path (L>1, or hotpath disabled) ─────── self.td_ = _core.prepare_transactions( X_num, y_train, 2 if self.adaptive_binning else self.B, col_names, is_cat_np, is_int_np, X_cat_raw if any(v is not None for v in X_cat_raw) else None, is_precoded_np, ) stage_times["prepare_transactions"] = t.ms cpp_mem_bytes = self.td_.memory_usage_bytes() n_items = len(self.td_.item_twu) K = self._effective_topK(n_items) K_mine = self._effective_mining_topK(n_items) if self.verbose: logger.info( " items=%d, K=%d, K_mine=%d, td_mem=%.1fMB", n_items, K, K_mine, cpp_mem_bytes / 1e6, ) t = self._timer() fit_deadline = ( time.perf_counter() + self.max_fit_seconds if self.max_fit_seconds else None ) raw_patterns = self._mine_with_fallback(y_train, n_cls, K_mine, fit_deadline) self.raw_patterns_ = sorted( raw_patterns, key=lambda pe: (-pe.utility, tuple(pe.items)) ) selected_patterns = self._select_patterns_for_budget(self.raw_patterns_, n_items) self.patterns_, _cached_coo = self._deduplicate_patterns_by_coverage( selected_patterns, len(y_train) ) stage_times["mine_patterns"] = t.ms if len(self.patterns_) == 0: if self.feature_mode == "patterns_only": raise HUGIMLMiningError( "No HUG patterns found. Try reducing G, increasing topK, or adjusting B / L." ) # In original_plus_patterns mode, zero mined HUG patterns is # not fatal: downstream fitting should fall back to the # original feature block with an empty pattern matrix. n_train = len(y_train) self.x_train_hup_ = csr_matrix((n_train, 0), dtype=np.float32) stage_times["build_matrix"] = 0.0 degraded = hasattr(self, "_degraded_reason") if degraded and self.verbose: logger.warning(" DEGRADED: %s", self._degraded_reason) if self.verbose: logger.info( " %d patterns in %.0f ms", len(self.patterns_), stage_times["mine_patterns"], ) t = self._timer() n_train = len(y_train) n_pats = len(self.patterns_) if n_pats == 0: # original_plus_* modes are allowed to continue with an # empty pattern block. Avoid calling native # build_train_matrix with an empty pattern list. self.x_train_hup_ = csr_matrix((n_train, 0), dtype=np.float32) else: if _cached_coo is not None: rows, cols = _cached_coo data = np.ones(len(rows), dtype=np.float32) self.x_train_hup_ = csr_matrix( (data, (rows, cols)), shape=(n_train, n_pats), dtype=np.float32 ) elif hasattr(_core, "build_train_matrix_csr"): indptr, indices = _core.build_train_matrix_csr(self.td_, self.patterns_) data = np.ones(len(indices), dtype=np.float32) self.x_train_hup_ = csr_matrix( (data, indices, indptr), shape=(n_train, n_pats), dtype=np.float32 ) else: rows, cols = _core.build_train_matrix(self.td_, self.patterns_) data = np.ones(len(rows), dtype=np.float32) self.x_train_hup_ = csr_matrix( (data, (rows, cols)), shape=(n_train, n_pats), dtype=np.float32 ) stage_times["build_matrix"] = t.ms # Optional internal cache-only path used by fast_grid_tune(). # At this point adaptive metadata, transaction data, mined patterns, # and the training HUG matrix are available. Skipping downstream # fitting, rich feature metadata, and drift-baseline construction is # correctness-preserving for tuning because each evaluated candidate # rebuilds its own downstream matrix/model from these cached mining # artefacts. if bool(getattr(self, "_fast_tune_cache_only", False)): self.td_ = _TransactionDataWrapper(self.td_, self) self._native_available_ = True self._fast_tune_stage_times_ = dict(stage_times) return self # Stage 5: fit downstream classifier t = self._timer() self._setup_feature_mode_metadata() self._setup_augmented_pair_transforms( X_train_original_for_downstream, y_train, fit=True ) self._current_y_for_downstream_topk_ = y_train try: self.x_train_downstream_ = self._make_downstream_features( X_train_original_for_downstream, self.x_train_hup_, fit=True ) finally: if hasattr(self, "_current_y_for_downstream_topk_"): delattr(self, "_current_y_for_downstream_topk_") self.x_train_downstream_ = self._apply_strict_topk_budget_fit( self.x_train_downstream_, y_train ) self._cache_downstream_feature_metadata() self.model_ = Pipeline([("clf", self._make_estimator(n_cls))]) self.model_.fit(self.x_train_downstream_, y_train) stage_times["fit_downstream"] = t.ms # Stage 6: wrap C++ td_ for Python compatibility t = self._timer() self.td_ = _TransactionDataWrapper(self.td_, self) self._native_available_ = True stage_times["compat"] = t.ms t = self._timer() if self._is_production_mode(): self.__dict__.pop("_drift_det", None) stage_times["drift_baseline"] = 0.0 else: self._drift_det = DriftDetector() self._drift_det.fit_baseline( X_num, cat_mask, getattr(self, "feature_names_in_", None) or [f"col{j}" for j in range(X_num.shape[1])], y=y_train, ) stage_times["drift_baseline"] = t.ms rss_delta_mb = (_get_peak_rss_kb() - rss_before) / 1024 n_compound = sum(1 for pe in self.patterns_ if len(pe.items) > 1) n_pats_final = len(self.patterns_) n_train_final = self.x_train_hup_.shape[0] nnz = self.x_train_hup_.nnz density = ( nnz / (n_train_final * n_pats_final) if (n_train_final * n_pats_final) > 0 else 0.0 ) downstream_names_for_metadata = list( getattr(self, "_downstream_feature_names_", []) or self._get_downstream_feature_names() ) downstream_feature_counts = { "original": sum( 1 for name in downstream_names_for_metadata if str(name).startswith("orig:") ), "pattern": sum( 1 for name in downstream_names_for_metadata if str(name).startswith("pattern:") ), "augmented_pair": sum( 1 for name in downstream_names_for_metadata if str(name).startswith("augmented_pair:") ), } downstream_feature_counts["total"] = len(downstream_names_for_metadata) self.fit_metadata_ = FitMetadata( n_samples=n_train_final, n_features=X_num.shape[1], n_classes=n_cls, n_items=len(getattr(self.td_, "item_twu", [])), n_patterns=n_pats_final, n_compound=n_compound, topK_used=self._effective_topK(len(getattr(self.td_, "item_twu", [])) or None), n_augmented_pairs=downstream_feature_counts.get("augmented_pair", 0), n_downstream_features=downstream_feature_counts.get("total", 0), downstream_feature_counts=downstream_feature_counts, stage_times_ms=stage_times, total_fit_ms=t_total.ms, matrix_density=density, config=dict( B=self.B, L=self.L, G=self.G, topK=self.topK, adaptive_binning=self.adaptive_binning, feature_mode=self.feature_mode, execution_mode=self.execution_mode, ), memory_peak_mb=round(mem.traced_peak_mb, 1), memory_rss_mb=round(rss_delta_mb, 1), memory_cpp_mb=round(cpp_mem_bytes / 1e6, 2), openmp_threads=actual_threads, degraded=hasattr(self, "_degraded_reason"), ) self._apply_execution_mode_retention() if self.verbose: logger.info(" fit complete: %s", self.fit_metadata_.summary()) return self def _mine_with_fallback( self, y_train: np.ndarray, n_cls: int, K: int, deadline: float | None ) -> list: """Mine patterns with graceful degradation on OOM or timeout. The ``deadline`` is forwarded into the C++ mining engine as a wall-clock ``timeout_s`` budget so the native layer can abort mid-run rather than only being checked between attempts. """ attempts = [ (K, self.L, self.G, "full"), (max(K // 2, 10), self.L, self.G, "K//2"), (max(K // 4, 10), 1, self.G, "K//4,L=1"), (50, 1, 0.0, "minimal"), ] for attempt_K, attempt_L, attempt_G, label in attempts: if deadline and time.perf_counter() > deadline: # Time budget exhausted — skip to minimal attempt immediately. minimal_K, minimal_L, minimal_G = 50, 1, 0.0 self._degraded_reason = ( f"Time budget exceeded at '{label}'; " f"falling back to minimal (K={minimal_K}, L={minimal_L})." ) logger.warning(" fit timeout: %s", self._degraded_reason) # Give the minimal attempt a fixed 5-second window; it is # cheap and must not run indefinitely on degenerate data. try: return list( _core.mine_patterns( self.td_, y_train, n_cls, minimal_K, minimal_L, minimal_G, 5.0, ) ) except Exception as exc: raise HUGIMLTimeoutError( f"fit() exceeded max_fit_seconds and the minimal fallback " f"also failed: {exc}" ) from exc # Compute remaining budget and pass it to the C++ engine so it # can abort mid-run rather than running past the wall-clock limit. remaining_s = max(deadline - time.perf_counter(), 0.0) if deadline else 0.0 try: patterns: list = list( _core.mine_patterns( self.td_, y_train, n_cls, attempt_K, attempt_L, attempt_G, remaining_s, ) ) if label != "full" and len(patterns) > 0: self._degraded_reason = ( f"Recovered with {label}: K={attempt_K}, L={attempt_L}, G={attempt_G}" ) return patterns except MemoryError: logger.warning("MemoryError during mining (%s), retrying…", label) continue except Exception as e: if "bad_alloc" in str(e).lower() or "memory" in str(e).lower(): logger.warning("C++ memory error during mining (%s), retrying…", label) continue raise return [] # ── Prediction ────────────────────────────────────────────────────────────
[docs] def predict_proba(self, X_test: Any) -> np.ndarray: """Predict class probabilities for X_test. When ``max_predict_ms`` is set large batches are processed in chunks. Rows exceeding the time budget receive uniform probabilities and a warning is emitted. Parameters ---------- X_test : array-like or DataFrame Returns ------- np.ndarray, shape (n_samples, n_classes) """ check_is_fitted(self) # Keep the same representation used to fit the downstream original # feature block: raw user input before adaptive/fixed-B pre-binning. # _build_test_hup applies _handle_test_nan() internally for the HUG # pattern matrix only; original_plus_* downstream columns are fitted # from raw X_train_original_for_downstream and therefore must transform # the raw test input as well. X_test_original_for_downstream = X_test # ── v1.1.0 adaptive pre-binning ─────────────────────────────────── if getattr(self, "adaptive_binning", False) and getattr(self, "_bin_edges_", None): X_test = self._prebin_for_predict(X_test) # ───────────────────────────────────────────────────────────────── t0 = time.perf_counter() budget_ms = self.max_predict_ms if budget_ms is None or not isinstance(X_test, (pd.DataFrame, np.ndarray)): Z_test = self._build_test_hup(X_test) X_downstream = self._make_downstream_features( X_test_original_for_downstream, Z_test, fit=False ) X_downstream = self._apply_strict_topk_budget_transform(X_downstream) proba = np.asarray(self.model_.predict_proba(X_downstream)) _mon = getattr(self, "monitor", None) if _mon is not None: _mon.record(proba, (time.perf_counter() - t0) * 1000) return proba n = len(X_test) n_cls = len(self.classes_) chunk_size = max(100, n // 10) result = np.full((n, n_cls), 1.0 / n_cls, dtype=np.float64) completed = 0 is_df = isinstance(X_test, pd.DataFrame) for start in range(0, n, chunk_size): elapsed_ms = (time.perf_counter() - t0) * 1000 if elapsed_ms > budget_ms: warnings.warn( f"Prediction SLA exceeded ({elapsed_ms:.0f}ms > {budget_ms}ms) " f"after {completed}/{n} rows. Remaining rows filled with uniform.", HUGIMLWarning, stacklevel=2, ) break end = min(start + chunk_size, n) chunk = X_test.iloc[start:end] if is_df else X_test[start:end] # type: ignore[union-attr] orig_chunk = ( X_test_original_for_downstream.iloc[start:end] if isinstance(X_test_original_for_downstream, pd.DataFrame) else X_test_original_for_downstream[start:end] ) Z_chunk = self._build_test_hup(chunk) X_downstream_chunk = self._make_downstream_features(orig_chunk, Z_chunk, fit=False) X_downstream_chunk = self._apply_strict_topk_budget_transform(X_downstream_chunk) result[start:end] = self.model_.predict_proba(X_downstream_chunk) completed = end _mon = getattr(self, "monitor", None) if _mon is not None: _mon.record(result[:completed], (time.perf_counter() - t0) * 1000) return result
[docs] def predict(self, X_test: Any) -> np.ndarray: """Predict class labels for X_test. Parameters ---------- X_test : array-like or DataFrame Returns ------- np.ndarray, shape (n_samples,) """ check_is_fitted(self) # Keep the same representation used to fit the downstream original # feature block: raw user input before adaptive/fixed-B pre-binning. # _build_test_hup applies _handle_test_nan() internally for the HUG # pattern matrix only; original_plus_* downstream columns are fitted # from raw X_train_original_for_downstream and therefore must transform # the raw test input as well. X_test_original_for_downstream = X_test # ── v1.1.0 adaptive pre-binning ─────────────────────────────────── if getattr(self, "adaptive_binning", False) and getattr(self, "_bin_edges_", None): X_test = self._prebin_for_predict(X_test) # ───────────────────────────────────────────────────────────────── Z_test = self._build_test_hup(X_test) X_downstream = self._make_downstream_features( X_test_original_for_downstream, Z_test, fit=False ) X_downstream = self._apply_strict_topk_budget_transform(X_downstream) return np.asarray(self.model_.predict(X_downstream))
# ── Downstream feature modes ───────────────────────────────────────────── def _copy_input_for_downstream(self, X: Any) -> Any: """Preserve raw input before adaptive/pre-binning for hybrid modes. Downstream original-feature preparation is read-only. For ndarray inputs the mining/pre-binning stages either consume X without mutation or bind a new pre-binned object to the local X_train variable, so a full eager copy here only adds O(n*p) time and memory. DataFrames still get a shallow schema-stable copy because later preprocessing may add/reorder columns. """ if isinstance(X, pd.DataFrame): return X.copy() return np.asarray(X) def _pattern_order_from_label(self, label: str) -> int: """Infer pattern order from a human-readable HUG pattern label. .. deprecated:: This method is retained for backward compatibility only. ``_setup_feature_mode_metadata`` derives pattern order directly from ``PatternEntry.items`` (the C++ item-ID list), which is the authoritative source of pattern length and is not affected by comma characters inside numeric interval notation such as ``age=[29.2, 38.4)``. The fallback parser intentionally counts feature assignments, not comma-separated chunks. Numeric intervals contain commas, so a label like ``age=[29.2, 38.4)`` must remain order-1. A conjunction such as ``age=[29,50), income=[50k,80k)`` is order-2 because it contains two top-level ``feature=...`` assignments. """ import re s = str(label or "").strip() if not s: return 1 # Fast path for native / ndarray labels such as # ``col0=[29.2, 38.4), col1=A``. Count distinct column tokens rather # than commas so interval bounds do not inflate the order. col_matches = re.findall(r"\b(col\d+)\s*=", s) if col_matches: return max(1, len(set(col_matches))) # Human-readable labels are emitted as ``name=value`` assignments, # with conjunctions separated by either commas or explicit boolean # markers (``AND``, ``and`` or ``&``). Count assignment starts that # occur at the beginning of the string or after one of those top-level # separators. The feature-name pattern must start with a # letter/underscore, so commas inside numeric intervals, e.g. # ``[29.2, 38.4)``, are not mistaken for new assignments. assignment_matches = re.findall( r"(?:^|,|\s+(?:AND|and|&)\s+)\s*([A-Za-z_][A-Za-z0-9_ .:/\-]*)\s*=", s, ) if assignment_matches: return max(1, len({name.strip() for name in assignment_matches if name.strip()})) # Last-resort fallback for unknown legacy formats: a label with an # explicit boolean conjunction marker is treated as an interaction; # otherwise keep the conservative singleton default. Do not split on # commas here because interval labels contain commas. if re.search(r"\s+(?:AND|and|&)\s+", s): return 2 return 1 def _setup_feature_mode_metadata(self) -> None: """Cache pattern-order masks used by hybrid feature modes. Pattern order (number of features in a pattern) is read directly from ``PatternEntry.items`` — the C++ item-ID list — rather than inferred from the human-readable label string. Label-string parsing (``_pattern_order_from_label``) mis-counts numeric singletons such as ``age=[29.2, 38.4)`` as order-2 because of the comma inside the interval notation, causing ``original_plus_interactions`` to incorrectly include numeric singletons in the downstream feature matrix. Using ``len(pe.items)`` gives the correct structural count: 1 for singletons, 2 for pair conjunctions, regardless of feature type or label format. """ patterns = getattr(self, "patterns_", None) if hasattr(self, "x_train_hup_"): n_hup_cols = int(self.x_train_hup_.shape[1]) elif getattr(self, "_training_pattern_matrix_shape_", None) is not None: n_hup_cols = int(self._training_pattern_matrix_shape_[1]) elif patterns is not None: n_hup_cols = int(len(patterns)) else: n_hup_cols = 0 if patterns is not None and len(patterns) == n_hup_cols: # Primary path: read order from C++ PatternEntry.items directly. orders = np.asarray([len(pe.items) for pe in patterns], dtype=int) else: # Fallback: patterns_ unavailable or length mismatch — should not # occur after a completed fit, but guard defensively. features = self.get_hug_features() orders = np.asarray([self._pattern_order_from_label(f) for f in features], dtype=int) if len(orders) != n_hup_cols: orders = np.ones(n_hup_cols, dtype=int) self._pattern_orders_ = orders self._interaction_pattern_mask_ = orders > 1 def _prepare_selected_original_features_for_downstream_transform( self, X: Any, selected_names: list[str] ) -> tuple[np.ndarray, list[str]]: """Materialize only persisted selected original downstream columns at predict time. Fit still prepares the full original block once so scoring/serialization stay unchanged. Prediction should not rebuild all original columns and then apply the fitted TopK mask: in original_plus_* modes the retained original columns are already known from ``_original_selected_feature_names_downstream_``. This helper constructs just those columns, preserving the exact fitted StandardScaler/median-imputation/dummy-column contract. """ selected_names = list(selected_names or []) if not selected_names: n_rows = ( len(X) if not isinstance(X, np.ndarray) else (1 if np.asarray(X).ndim == 1 else np.asarray(X).shape[0]) ) return self._empty_dense_block(n_rows), [] selected_raw = [ str(name)[5:] if str(name).startswith("orig:") else str(name) for name in selected_names ] num_cols = list(getattr(self, "_original_numeric_cols_", [])) cat_cols = list(getattr(self, "_original_cat_cols_", [])) dummy_cols = list(getattr(self, "_original_dummy_columns_", [])) num_pos = {str(c): i for i, c in enumerate(num_cols)} dummy_set = {str(c) for c in dummy_cols} selected_numeric = [name for name in selected_raw if name in num_pos] selected_dummy = [name for name in selected_raw if name in dummy_set] train_names = list(getattr(self, "feature_names_in_", []) or []) train_pos = {str(c): i for i, c in enumerate(train_names)} is_df = isinstance(X, pd.DataFrame) arr = None if is_df else np.asarray(X) if arr is not None and arr.ndim == 1: arr = arr.reshape(1, -1) n_rows = len(X) if is_df else int(arr.shape[0]) blocks: list[np.ndarray] = [] block_names: list[str] = [] if selected_numeric: idx_in_num = np.asarray([num_pos[name] for name in selected_numeric], dtype=np.int64) if is_df: X_num_sel = pd.DataFrame(index=X.index) for name in selected_numeric: X_num_sel[name] = X[name] if name in X.columns else np.nan raw_num = ( X_num_sel.apply(pd.to_numeric, errors="coerce") .replace([np.inf, -np.inf], np.nan) .to_numpy(dtype=np.float64, copy=True) ) else: assert arr is not None raw_num = np.empty((n_rows, len(selected_numeric)), dtype=np.float64) for out_j, name in enumerate(selected_numeric): src_j = train_pos.get(name, out_j) if 0 <= src_j < arr.shape[1]: raw_num[:, out_j] = np.asarray(arr[:, src_j], dtype=np.float64) else: raw_num[:, out_j] = np.nan raw_num[~np.isfinite(raw_num)] = np.nan med_arr = getattr(self, "_original_numeric_medians_array_", None) if med_arr is None or len(med_arr) != len(num_cols): med = getattr(self, "_original_numeric_medians_", pd.Series(dtype=float)) med_arr = med.reindex(num_cols).fillna(0.0).to_numpy(dtype=np.float64, copy=True) med_sel = np.asarray(med_arr, dtype=np.float64)[idx_in_num] bad = ~np.isfinite(raw_num) if bad.any(): raw_num[bad] = np.take(med_sel, np.nonzero(bad)[1]) scaler = self._original_scaler_ mean = np.asarray(getattr(scaler, "mean_", np.zeros(len(num_cols))), dtype=np.float64)[ idx_in_num ] scale = np.asarray(getattr(scaler, "scale_", np.ones(len(num_cols))), dtype=np.float64)[ idx_in_num ] scale = np.where(scale == 0.0, 1.0, scale) blocks.append(((raw_num - mean) / scale).astype(np.float32, copy=False)) block_names.extend([f"orig:{name}" for name in selected_numeric]) if selected_dummy: if is_df: X_cat = X.reindex(columns=cat_cols) else: assert arr is not None data = {} for name in cat_cols: src_j = train_pos.get(str(name), None) if src_j is not None and 0 <= src_j < arr.shape[1]: data[name] = arr[:, src_j] else: data[name] = np.full(n_rows, np.nan, dtype=object) X_cat = pd.DataFrame(data) X_cat_dum = ( pd.get_dummies(X_cat.astype("string"), dummy_na=True) if len(cat_cols) else pd.DataFrame(index=range(n_rows)) ) X_cat_dum = X_cat_dum.reindex(columns=selected_dummy, fill_value=0) blocks.append(X_cat_dum.to_numpy(dtype=np.float32, copy=False)) block_names.extend([f"orig:{name}" for name in selected_dummy]) # Preserve fitted selected_names order even when numeric and dummy columns # are interleaved. The two blocks above are built by type for speed; this # final gather restores the exact downstream coefficient alignment. if not blocks: return self._empty_dense_block(n_rows), [] by_name = {} dense_concat = ( np.hstack(blocks).astype(np.float32, copy=False) if len(blocks) > 1 else blocks[0] ) for j, name in enumerate(block_names): by_name[name] = dense_concat[:, j] missing_selected = [name for name in selected_names if name not in by_name] if missing_selected: raise HUGIMLSchemaError( "Selected original downstream feature(s) are unavailable during transform: " f"{missing_selected[:10]!r}. This usually indicates schema drift or a " "model/metadata mismatch. Refit the model or provide input columns " "matching the training schema." ) out = np.empty((n_rows, len(selected_names)), dtype=np.float32) for j, name in enumerate(selected_names): out[:, j] = by_name[name] return out, list(selected_names) def _prepare_original_features_for_downstream(self, X: Any, fit: bool = False): """Prepare original input features for hybrid downstream estimators. This intentionally does not affect transform(), get_hug_features(), or any pattern diagnostics. It is used only by predict/fit when feature_mode includes original features. """ # Fast all-numeric ndarray fit/transform path. Avoid constructing a # DataFrame and running pandas apply/to_numeric over every column for the # common large-n benchmark path. This preserves the exact fitted # StandardScaler/median-imputation/original feature-name contract. if not isinstance(X, pd.DataFrame): arr = np.asarray(X) if arr.ndim == 1: arr = arr.reshape(1, -1) names = getattr(self, "feature_names_in_", None) if names is None or len(names) != arr.shape[1]: names = [f"col{j}" for j in range(arr.shape[1])] cat_mask = getattr(self, "cat_cols_mask_", None) if cat_mask is not None and not bool(np.any(cat_mask)): raw = np.array(arr, dtype=np.float64, copy=True) raw[~np.isfinite(raw)] = np.nan if fit: self._original_numeric_cols_ = list(names) self._original_cat_cols_ = [] med_arr = np.nanmedian(raw, axis=0) if raw.shape[1] else np.empty(0) med_arr = np.where(np.isfinite(med_arr), med_arr, 0.0).astype( np.float64, copy=False ) self._original_numeric_medians_array_ = med_arr.copy() self._original_numeric_medians_ = pd.Series(med_arr, index=list(names)) bad = ~np.isfinite(raw) if bad.any(): raw[bad] = np.take(med_arr, np.nonzero(bad)[1]) self._original_scaler_ = StandardScaler() X_num_arr = ( self._original_scaler_.fit_transform(raw) if raw.shape[1] else np.empty((raw.shape[0], 0)) ) self._original_dummy_columns_ = [] self._original_feature_names_downstream_ = list(names) return X_num_arr.astype(np.float32, copy=False) num_cols = list(getattr(self, "_original_numeric_cols_", [])) if ( num_cols and list(names) == num_cols and not getattr(self, "_original_cat_cols_", []) ): med_arr = getattr(self, "_original_numeric_medians_array_", None) if med_arr is None or len(med_arr) != raw.shape[1]: med = getattr(self, "_original_numeric_medians_", pd.Series(dtype=float)) med_arr = ( med.reindex(num_cols).fillna(0.0).to_numpy(dtype=np.float64, copy=True) ) bad = ~np.isfinite(raw) if bad.any(): raw[bad] = np.take(med_arr, np.nonzero(bad)[1]) X_num_arr = self._original_scaler_.transform(raw) return X_num_arr.astype(np.float32, copy=False) if isinstance(X, pd.DataFrame): X_df = X.copy() else: names = getattr(self, "feature_names_in_", None) arr = np.asarray(X) if arr.ndim == 1: arr = arr.reshape(1, -1) if names is None or len(names) != arr.shape[1]: names = [f"col{j}" for j in range(arr.shape[1])] X_df = pd.DataFrame(arr, columns=list(names)) # Stabilize column order against training schema when available. train_names = getattr(self, "feature_names_in_", None) if train_names is not None: for col in train_names: if col not in X_df.columns: X_df[col] = np.nan X_df = X_df[list(train_names)] # Hot predict path: fitted all-numeric original_plus_* models can avoid # pandas apply/fillna/reindex/get_dummies. This preserves the fitted # StandardScaler and median-imputation contract exactly. if not fit: num_cols = list(getattr(self, "_original_numeric_cols_", [])) cat_cols = list(getattr(self, "_original_cat_cols_", [])) dummy_cols = list(getattr(self, "_original_dummy_columns_", [])) if num_cols and not cat_cols and not dummy_cols and list(X_df.columns) == num_cols: try: X_num_arr_raw = X_df.to_numpy(dtype=np.float64, copy=True) med_arr = getattr(self, "_original_numeric_medians_array_", None) if med_arr is None or len(med_arr) != X_num_arr_raw.shape[1]: med = getattr(self, "_original_numeric_medians_", pd.Series(dtype=float)) med_arr = ( med.reindex(num_cols).fillna(0.0).to_numpy(dtype=np.float64, copy=True) ) bad = ~np.isfinite(X_num_arr_raw) if bad.any(): X_num_arr_raw[bad] = np.take(med_arr, np.nonzero(bad)[1]) X_num_arr = self._original_scaler_.transform(X_num_arr_raw) return X_num_arr.astype(np.float32, copy=False) except Exception: pass # Numeric columns are scaled; non-numeric columns are one-hot encoded. numeric = X_df.apply(pd.to_numeric, errors="coerce").replace([np.inf, -np.inf], np.nan) numeric_cols = [c for c in X_df.columns if not numeric[c].isna().all()] X_num = numeric[numeric_cols] if numeric_cols else pd.DataFrame(index=X_df.index) X_cat = X_df.drop(columns=numeric_cols, errors="ignore") if fit: self._original_numeric_cols_ = list(X_num.columns) self._original_cat_cols_ = list(X_cat.columns) self._original_numeric_medians_ = X_num.median(numeric_only=True).fillna(0.0) self._original_numeric_medians_array_ = self._original_numeric_medians_.reindex( self._original_numeric_cols_ ).to_numpy(dtype=np.float64, copy=True) X_num_filled = X_num.fillna(self._original_numeric_medians_) self._original_scaler_ = StandardScaler() X_num_arr = ( self._original_scaler_.fit_transform( X_num_filled.to_numpy(dtype=np.float64, copy=False) ) if len(self._original_numeric_cols_) else np.empty((len(X_df), 0)) ) X_cat_dum = ( pd.get_dummies(X_cat.astype("string"), dummy_na=True) if len(self._original_cat_cols_) else pd.DataFrame(index=X_df.index) ) self._original_dummy_columns_ = list(X_cat_dum.columns) else: num_cols = getattr(self, "_original_numeric_cols_", []) med = getattr(self, "_original_numeric_medians_", pd.Series(dtype=float)) X_num = numeric.reindex(columns=num_cols) X_num_filled = X_num.fillna(med).fillna(0.0) if len(num_cols): X_num_arr = self._original_scaler_.transform( X_num_filled.to_numpy(dtype=np.float64, copy=False) ) else: X_num_arr = np.empty((len(X_df), 0)) cat_cols = getattr(self, "_original_cat_cols_", []) X_cat = X_df.reindex(columns=cat_cols) X_cat_dum = ( pd.get_dummies(X_cat.astype("string"), dummy_na=True) if len(cat_cols) else pd.DataFrame(index=X_df.index) ) dummy_cols = getattr(self, "_original_dummy_columns_", []) X_cat_dum = X_cat_dum.reindex(columns=dummy_cols, fill_value=0) X_cat_arr = ( X_cat_dum.to_numpy(dtype=np.float64, copy=False) if X_cat_dum.shape[1] else np.empty((len(X_df), 0)) ) X_base = np.hstack([X_num_arr, X_cat_arr]) if X_cat_arr.shape[1] else X_num_arr if fit: self._original_feature_names_downstream_ = list( getattr(self, "_original_numeric_cols_", []) ) + list(getattr(self, "_original_dummy_columns_", [])) return X_base.astype(np.float32, copy=False) def _as_dense_float32(self, X: Any) -> np.ndarray: """Return a dense float32 2-D array without changing estimator semantics.""" arr = X.toarray() if issparse(X) else np.asarray(X) if arr.ndim == 1: arr = arr.reshape(-1, 1) return arr.astype(np.float32, copy=False) def _empty_dense_block(self, n_rows: int) -> np.ndarray: return np.empty((int(n_rows), 0), dtype=np.float32) def _original_topk_budget_enabled(self) -> bool: """Whether original features should be pre-budgeted before downstream fit.""" return self.topK is not None and int(self.topK) >= 0 def _select_original_topk_fit( self, X_base: Any, y: Any, names: list[str] ) -> tuple[Any, list[str]]: """Select at most topK original downstream columns and persist the mask. The mask is applied before concatenating originals with pattern blocks. This makes original_plus_* non-strict mode symmetric with mined and augmented features: originals contribute at most topK columns when a finite topK budget is configured. Strict mode then performs the global topK pass over this already-budgeted candidate set. """ n_cols = len(names) self._original_feature_names_downstream_full_ = list(names) self._original_feature_scores_downstream_ = np.zeros(n_cols, dtype=np.float64) self._original_feature_mask_downstream_ = np.ones(n_cols, dtype=bool) self._original_selected_feature_names_downstream_ = list(names) if n_cols == 0 or not self._original_topk_budget_enabled(): return X_base, list(names) budget = min(max(1, int(self.topK)), n_cols) if budget >= n_cols: native_scores = getattr(self, "_native_original_feature_scores_downstream_", None) if native_scores is not None and len(native_scores) == n_cols: self._original_feature_scores_downstream_ = np.asarray( native_scores, dtype=np.float64 ) return X_base, list(names) native_names = list(getattr(self, "_native_original_feature_names_downstream_", []) or []) native_scores = getattr(self, "_native_original_feature_scores_downstream_", None) if ( native_names == list(names) and native_scores is not None and len(native_scores) == n_cols ): scores = np.asarray(native_scores, dtype=np.float64) order = np.lexsort((np.arange(n_cols), -scores)) keep_idx = np.sort(order[:budget]) mask = np.zeros(n_cols, dtype=bool) mask[keep_idx] = True else: # Non-fused or schema-mismatch path: still native, but necessarily # uses the dense downstream block because no preparation-stage score # metadata is available for this fit. scores, mask = self._strict_topk_dense_column_scores(X_base, y, names, top_k=budget) selected_names = [name for name, keep in zip(names, mask) if keep] self._original_feature_scores_downstream_ = scores self._original_feature_mask_downstream_ = mask self._original_selected_feature_names_downstream_ = list(selected_names) return X_base[:, mask], selected_names def _select_original_topk_transform( self, X_base: Any, names: list[str] ) -> tuple[Any, list[str]]: """Apply the persisted original-feature prefilter at transform time.""" mask = getattr(self, "_original_feature_mask_downstream_", None) selected_names = getattr(self, "_original_selected_feature_names_downstream_", None) if mask is None: return X_base, list(names) mask_arr = np.asarray(mask, dtype=bool) if mask_arr.size != len(names): return X_base, list(names) if bool(np.all(mask_arr)): return X_base, list(names if selected_names is None else selected_names) return X_base[:, mask_arr], list(names if selected_names is None else selected_names) def _selected_original_downstream_names(self) -> list[str]: names = getattr(self, "_original_selected_feature_names_downstream_", None) if names is not None: return list(names) return [f"orig:{name}" for name in getattr(self, "_original_feature_names_downstream_", [])] def _strict_topk_dense_column_scores( self, X: Any, y: Any, names: list[str], top_k: int = -1 ) -> tuple[np.ndarray, np.ndarray]: """Score dense downstream columns and return ``(scores, mask)``. Prefer the native dense TopK helper when the v1.1.9 extension exposes it, but fall back to the same Python IG scoring used by the sparse path when the Python package is run against a v1.1.8 native wheel. This preserves correctness for hybrid feature modes with ``topk_budget_strict=True``. """ X_arr = np.ascontiguousarray(self._as_dense_float32(X), dtype=np.float32) n_cols = int(X_arr.shape[1]) if n_cols == 0: return np.zeros(0, dtype=np.float64), np.zeros(0, dtype=bool) y_codes, _ = pd.factorize(np.asarray(y), sort=True) y_codes = np.ascontiguousarray(y_codes.astype(np.int64, copy=False)) discrete_mask_bool = self._strict_topk_discrete_mask(names).astype(bool, copy=False) max_bins = max( 8, int( getattr(self, "B", 8) if getattr(self, "B", 8) and getattr(self, "B", 8) > 0 else 16 ), ) if _CORE_AVAILABLE and hasattr(_core, "strict_topk_filter_dense"): discrete_mask = np.ascontiguousarray(discrete_mask_bool.astype(np.uint8, copy=False)) scores, mask = _core.strict_topk_filter_dense( X_arr, y_codes, discrete_mask, int(top_k), int(max_bins) ) return np.asarray(scores, dtype=np.float64), np.asarray(mask, dtype=bool) n_classes = int(np.max(y_codes)) + 1 if y_codes.size else 0 scores = np.zeros(n_cols, dtype=np.float64) if n_classes > 1: for j in range(n_cols): col = np.asarray(X_arr[:, j], dtype=np.float64) if bool(discrete_mask_bool[j]): vals = np.where(col > 0.5, 1, 0).astype(np.int64, copy=False) else: vals = _continuous_to_quantile_codes(col, max_bins=max_bins) scores[j] = _information_gain_from_codes(vals, y_codes, n_classes) budget = int(top_k) mask = np.ones(n_cols, dtype=bool) if budget >= 0 and budget < n_cols: budget = max(1, budget) order = np.lexsort((np.arange(n_cols), -scores)) keep_idx = np.sort(order[:budget]) mask = np.zeros(n_cols, dtype=bool) mask[keep_idx] = True return scores, mask def _combine_dense_blocks(self, blocks: list[Any], n_rows: int) -> np.ndarray: """Combine selected downstream blocks as one dense float32 matrix.""" dense_blocks = [] for block in blocks: if block is None or int(getattr(block, "shape", (n_rows, 0))[1]) == 0: continue dense_blocks.append(self._as_dense_float32(block)) if not dense_blocks: return self._empty_dense_block(n_rows) if len(dense_blocks) == 1: return dense_blocks[0] return np.hstack(dense_blocks).astype(np.float32, copy=False) def _dense_downstream_width_threshold(self) -> int: """Width threshold for materializing hybrid downstream matrices as dense. Hybrid feature modes contain dense original columns and sparse pattern columns. Dense output is fastest and smallest for small selected widths because LR sees a compact float32 ndarray. For large selected widths, densifying sparse pattern columns wastes memory; keep the output CSR. The threshold is user-configurable via the sklearn parameter ``dense_downstream_max_width``. Set it to 0 to keep hybrid matrices CSR except for the empty-width degenerate case. Older loaded models that lack the public parameter can still fall back to the private compatibility attribute or the historical default of 200. """ try: value = getattr( self, "dense_downstream_max_width", getattr(self, "_dense_downstream_max_width_", 200), ) if isinstance(value, bool): return 200 return max(0, int(value)) except Exception: return 200 def _should_use_dense_downstream(self, total_width: int) -> bool: mode = getattr(self, "feature_mode", "patterns_only") if mode == "patterns_only": return False threshold = self._dense_downstream_width_threshold() return int(total_width) <= int(threshold) def _combine_downstream_blocks(self, blocks: list[Any], n_rows: int): """Combine downstream blocks using a memory-aware output format. * patterns_only is handled earlier and remains CSR. * hybrid modes use dense float32 for small/moderate widths, avoiding sparse->dense churn before LR. * hybrid modes use CSR once width is large enough that densifying sparse pattern/augmented blocks would dominate memory. """ live_blocks = [ block for block in blocks if block is not None and int(getattr(block, "shape", (n_rows, 0))[1]) > 0 ] total_width = sum(int(block.shape[1]) for block in live_blocks) if total_width == 0: return ( self._empty_dense_block(n_rows) if self._should_use_dense_downstream(0) else csr_matrix((int(n_rows), 0), dtype=np.float32) ) if self._should_use_dense_downstream(total_width): return self._combine_dense_blocks(live_blocks, n_rows) sparse_blocks = [] for block in live_blocks: if issparse(block): sparse_blocks.append(block.astype(np.float32, copy=False).tocsr()) else: sparse_blocks.append(csr_matrix(np.asarray(block, dtype=np.float32))) if len(sparse_blocks) == 1: return sparse_blocks[0] return hstack(sparse_blocks, format="csr", dtype=np.float32) def _select_strict_topk_from_blocks_fit( self, blocks: list[tuple[str, Any, list[str]]], y: Any, ) -> tuple[list[Any], list[str]]: """Apply strict global TopK before concatenating downstream feature blocks. This keeps strict mode as a compute budget: dense original columns are scored directly, sparse pattern columns are scored in sparse form, and only selected columns are materialized for the downstream estimator. The persisted full mask and scores retain the same public/serialization contract as the prior post-concatenation implementation. """ full_names: list[str] = [] full_scores_parts: list[np.ndarray] = [] selected_blocks: list[Any] = [] for _, block, names in blocks: full_names.extend(list(names)) n_cols = len(full_names) self._downstream_feature_names_full_ = list(full_names) self._strict_topk_applied_during_construction_ = False self._strict_topk_feature_scores_ = np.zeros(n_cols, dtype=np.float64) self._strict_topk_feature_mask_ = np.ones(n_cols, dtype=bool) self._strict_topk_selected_feature_names_ = list(full_names) if ( not bool(getattr(self, "topk_budget_strict", False)) or self.topK is None or int(self.topK) < 0 or n_cols == 0 or int(self.topK) >= n_cols ): return [block for _, block, _ in blocks], list(full_names) for kind, block, names in blocks: if len(names) == 0: full_scores_parts.append(np.zeros(0, dtype=np.float64)) elif kind == "dense": full_scores_parts.append( self._strict_topk_dense_column_scores(block, y, names, top_k=-1)[0] ) else: full_scores_parts.append(self._strict_topk_column_scores(block, y, names)) scores = ( np.concatenate(full_scores_parts).astype(np.float64, copy=False) if full_scores_parts else np.zeros(0, dtype=np.float64) ) budget = min(max(1, int(self.topK)), n_cols) order = np.lexsort((np.arange(n_cols), -scores)) keep_idx = np.sort(order[:budget]) mask = np.zeros(n_cols, dtype=bool) mask[keep_idx] = True offset = 0 selected_names: list[str] = [] for kind, block, names in blocks: width = len(names) block_mask = mask[offset : offset + width] if width and bool(np.any(block_mask)): selected_names.extend([name for name, keep in zip(names, block_mask) if keep]) selected_blocks.append(block[:, block_mask]) offset += width self._strict_topk_feature_scores_ = scores self._strict_topk_feature_mask_ = mask self._strict_topk_selected_feature_names_ = list(selected_names) self._strict_topk_applied_during_construction_ = True return selected_blocks, selected_names def _select_strict_topk_from_blocks_transform( self, blocks: list[tuple[str, Any, list[str]]] ) -> list[Any]: """Select persisted strict TopK columns from logical blocks before concat.""" mask = getattr(self, "_strict_topk_feature_mask_", None) if mask is None: return [block for _, block, _ in blocks] mask_arr = np.asarray(mask, dtype=bool) full_width = sum(len(names) for _, _, names in blocks) if mask_arr.size != full_width or bool(np.all(mask_arr)): return [block for _, block, _ in blocks] selected_blocks: list[Any] = [] offset = 0 for _, block, names in blocks: width = len(names) block_mask = mask_arr[offset : offset + width] if width and bool(np.any(block_mask)): selected_blocks.append(block[:, block_mask]) offset += width return selected_blocks def _make_downstream_features(self, X_original: Any, Z_patterns: csr_matrix, fit: bool = False): """Build the estimator input matrix for the configured feature_mode.""" mode = getattr(self, "feature_mode", "patterns_only") Z = Z_patterns if issparse(Z_patterns) else csr_matrix(Z_patterns) Z_aug = self._make_augmented_pair_features(X_original, fit=fit) n_rows = int(Z.shape[0]) if mode == "patterns_only": return hstack([Z, Z_aug], format="csr") if Z_aug.shape[1] else Z if not fit: selected_original_names = getattr( self, "_original_selected_feature_names_downstream_", None ) full_original_names = [ f"orig:{name}" for name in getattr(self, "_original_feature_names_downstream_", []) ] mask = getattr(self, "_original_feature_mask_downstream_", None) use_selected_originals = ( selected_original_names is not None and mask is not None and len(selected_original_names) <= len(full_original_names) ) if use_selected_originals: X_base, original_names = ( self._prepare_selected_original_features_for_downstream_transform( X_original, list(selected_original_names) ) ) else: X_base = self._prepare_original_features_for_downstream(X_original, fit=False) X_base, original_names = self._select_original_topk_transform( X_base, full_original_names ) else: X_base = self._prepare_original_features_for_downstream(X_original, fit=True) original_names_full = [ f"orig:{name}" for name in getattr(self, "_original_feature_names_downstream_", []) ] X_base, original_names = self._select_original_topk_fit( X_base, self._current_y_for_downstream_topk_, original_names_full ) pattern_names = [f"pattern:{name}" for name in self.get_hug_features()] aug_names = [ f"augmented_pair:{t['name']}" for t in getattr(self, "augmented_pair_transforms_", []) ] if mode == "original_plus_patterns": blocks = [("dense", X_base, original_names), ("sparse", Z, pattern_names)] if Z_aug.shape[1]: blocks.append(("sparse", Z_aug, aug_names)) if fit and bool(getattr(self, "topk_budget_strict", False)): selected_blocks, _ = self._select_strict_topk_from_blocks_fit( blocks, self._current_y_for_downstream_topk_ ) return self._combine_downstream_blocks(selected_blocks, n_rows) if bool(getattr(self, "topk_budget_strict", False)): selected_blocks = self._select_strict_topk_from_blocks_transform(blocks) return self._combine_downstream_blocks(selected_blocks, n_rows) return self._combine_downstream_blocks([X_base, Z, Z_aug], n_rows) if mode == "original_plus_interactions": mask = getattr(self, "_interaction_pattern_mask_", None) if mask is None: self._setup_feature_mode_metadata() mask = self._interaction_pattern_mask_ Z_sel = Z[:, mask] selected_pattern_names = [name for name, keep in zip(pattern_names, mask) if keep] blocks = [("dense", X_base, original_names), ("sparse", Z_sel, selected_pattern_names)] if Z_aug.shape[1]: blocks.append(("sparse", Z_aug, aug_names)) if fit and bool(getattr(self, "topk_budget_strict", False)): selected_blocks, _ = self._select_strict_topk_from_blocks_fit( blocks, self._current_y_for_downstream_topk_ ) return self._combine_downstream_blocks(selected_blocks, n_rows) if bool(getattr(self, "topk_budget_strict", False)): selected_blocks = self._select_strict_topk_from_blocks_transform(blocks) return self._combine_downstream_blocks(selected_blocks, n_rows) return self._combine_downstream_blocks([X_base, Z_sel, Z_aug], n_rows) raise HUGIMLParamError(f"Unknown feature_mode={mode!r}.") def _get_downstream_feature_names_full(self) -> list[str]: """Names for the unfiltered downstream feature matrix.""" mode = getattr(self, "feature_mode", "patterns_only") pattern_names = list(self.get_hug_features()) aug_names = [ f"augmented_pair:{t['name']}" for t in getattr(self, "augmented_pair_transforms_", []) ] if mode == "patterns_only": return [f"pattern:{name}" for name in pattern_names] + aug_names original_names = self._selected_original_downstream_names() if mode == "original_plus_patterns": return original_names + [f"pattern:{name}" for name in pattern_names] + aug_names if mode == "original_plus_interactions": mask = getattr( self, "_interaction_pattern_mask_", np.ones(len(pattern_names), dtype=bool) ) selected = [name for name, keep in zip(pattern_names, mask) if keep] return original_names + [f"pattern:{name}" for name in selected] + aug_names return [f"pattern:{name}" for name in pattern_names] def _get_downstream_feature_names(self) -> list[str]: """Names aligned with coefficients of the downstream estimator.""" names = self._get_downstream_feature_names_full() mask = getattr(self, "_strict_topk_feature_mask_", None) if mask is None: return names return [name for name, keep in zip(names, np.asarray(mask, dtype=bool)) if keep] def _is_discrete_downstream_feature(self, name: str) -> bool: return name.startswith("pattern:") or ( name.startswith("orig:") and name in getattr(self, "_strict_topk_dummy_names_", set()) ) def _strict_topk_discrete_mask(self, names: list[str]) -> np.ndarray: """Boolean mask of downstream columns that should be IG-scored as discrete.""" dummy_names = {f"orig:{c}" for c in getattr(self, "_original_dummy_columns_", [])} self._strict_topk_dummy_names_ = dummy_names return np.asarray( [name.startswith("pattern:") or name in dummy_names for name in names], dtype=np.uint8, ) def _strict_topk_column_scores(self, X: csr_matrix, y: Any, names: list[str]) -> np.ndarray: """Compute comparable IG scores for strict global topK filtering. The native path scores CSC columns directly. The Python fallback is kept only for source-tree development before the extension has been rebuilt. """ y_codes, _ = pd.factorize(np.asarray(y), sort=True) n_classes = int(np.max(y_codes)) + 1 if y_codes.size else 0 if n_classes <= 1: return np.zeros(X.shape[1], dtype=np.float64) X_csc = X.tocsc() if issparse(X) else csr_matrix(X).tocsc() discrete_mask = self._strict_topk_discrete_mask(names) max_bins = max( 8, int( getattr(self, "B", 8) if getattr(self, "B", 8) and getattr(self, "B", 8) > 0 else 16 ), ) if _CORE_AVAILABLE and hasattr(_core, "strict_topk_filter_csc"): scores, _ = _core.strict_topk_filter_csc( np.asarray(X_csc.data, dtype=np.float32), np.asarray(X_csc.indices, dtype=np.int32), np.asarray(X_csc.indptr, dtype=np.int32), int(X_csc.shape[0]), int(X_csc.shape[1]), np.asarray(y_codes, dtype=np.int64), discrete_mask, -1, int(max_bins), ) return np.asarray(scores, dtype=np.float64) scores = np.zeros(X.shape[1], dtype=np.float64) for j in range(X.shape[1]): col = np.asarray(X_csc[:, j].toarray()).ravel() if bool(discrete_mask[j]): vals = np.where(col > 0.5, 1, 0).astype(np.int64, copy=False) else: vals = _continuous_to_quantile_codes(col, max_bins=max_bins) scores[j] = _information_gain_from_codes(vals, y_codes, n_classes) return scores def _apply_strict_topk_budget_fit(self, X: csr_matrix, y: Any) -> csr_matrix: """Optionally apply a single native global IG topK budget over all downstream features.""" n_cols = int(X.shape[1]) names = self._get_downstream_feature_names_full() existing_mask = getattr(self, "_strict_topk_feature_mask_", None) if bool(getattr(self, "_strict_topk_applied_during_construction_", False)): return X if ( bool(getattr(self, "topk_budget_strict", False)) and existing_mask is not None and len(existing_mask) != n_cols ): # Strict TopK was already applied during block-wise downstream # construction. Preserve the full-length persisted mask/scores and # return the selected estimator matrix unchanged. return X self._downstream_feature_names_full_ = list(names) self._strict_topk_feature_scores_ = np.zeros(n_cols, dtype=np.float64) self._strict_topk_feature_mask_ = np.ones(n_cols, dtype=bool) self._strict_topk_selected_feature_names_ = list(names) if ( not bool(getattr(self, "topk_budget_strict", False)) or self.topK is None or int(self.topK) < 0 or n_cols == 0 or int(self.topK) >= n_cols ): return X budget = min(max(1, int(self.topK)), n_cols) y_codes, _ = pd.factorize(np.asarray(y), sort=True) X_csc = X.tocsc() if issparse(X) else csr_matrix(X).tocsc() discrete_mask = self._strict_topk_discrete_mask(names) max_bins = max( 8, int( getattr(self, "B", 8) if getattr(self, "B", 8) and getattr(self, "B", 8) > 0 else 16 ), ) if _CORE_AVAILABLE and hasattr(_core, "strict_topk_filter_csc"): scores, mask_native = _core.strict_topk_filter_csc( np.asarray(X_csc.data, dtype=np.float32), np.asarray(X_csc.indices, dtype=np.int32), np.asarray(X_csc.indptr, dtype=np.int32), int(X_csc.shape[0]), int(X_csc.shape[1]), np.asarray(y_codes, dtype=np.int64), discrete_mask, int(budget), int(max_bins), ) scores = np.asarray(scores, dtype=np.float64) mask = np.asarray(mask_native, dtype=bool) else: scores = self._strict_topk_column_scores(X, y, names) order = np.lexsort((np.arange(n_cols), -scores)) keep_idx = np.sort(order[:budget]) mask = np.zeros(n_cols, dtype=bool) mask[keep_idx] = True self._strict_topk_feature_scores_ = scores self._strict_topk_feature_mask_ = mask self._strict_topk_selected_feature_names_ = [ name for name, keep in zip(names, mask) if keep ] return X[:, mask] def _apply_strict_topk_budget_transform(self, X: csr_matrix) -> csr_matrix: if bool(getattr(self, "_strict_topk_applied_during_construction_", False)): return X mask = getattr(self, "_strict_topk_feature_mask_", None) if mask is None: return X mask_arr = np.asarray(mask, dtype=bool) if mask_arr.size != X.shape[1] or bool(np.all(mask_arr)): return X return X[:, mask_arr] def _numeric_feature_names_for_augmented_pairs(self) -> list[str]: names = list(getattr(self, "feature_names_in_", []) or []) cat_mask = getattr(self, "cat_cols_mask_", np.zeros(len(names), dtype=bool)) return [name for name, is_cat in zip(names, cat_mask) if not bool(is_cat)] def _setup_augmented_pair_transforms( self, X_original: Any, y: Any | None = None, fit: bool = False ) -> None: """Create internal augmented_pair_transforms when L>1. This reuses native adaptive-binning IG metadata and does not expose a public hyperparameter. Augmented pair transforms are not fed into HUGIML mining; they are appended only before the downstream estimator. """ enabled = bool(self.augmented_pair_transforms) if not fit or self.L <= 1 or not enabled or X_original is None: self._augmented_pair_block_ = None self.augmented_pair_transforms_ = [] self.augmented_pair_selected_features_ = [] self.augmented_pair_transforms_enabled_ = False return if not getattr(self, "adaptive_binning", False): warnings.warn( "augmented_pair_transforms require adaptive_binning=True because they are selected from adaptive-binning IG metadata; no augmented pair features will be added.", HUGIMLWarning, stacklevel=2, ) self._augmented_pair_block_ = None self.augmented_pair_transforms_ = [] self.augmented_pair_selected_features_ = [] self.augmented_pair_transforms_enabled_ = False self.augmented_pair_config_ = { "enabled": False, "reason": "adaptive_binning_required", "max_features": int(self.augmented_pair_max_features), "budget": int(self.topK) if self.topK is not None and self.topK >= 0 else None, "num_candidates": 0, "num_retained": 0, } return if not getattr(self, "ig_scores_", None): warnings.warn( "augmented_pair_transforms were requested but no adaptive-binning IG scores are available; no augmented pair features will be added.", HUGIMLWarning, stacklevel=2, ) self._augmented_pair_block_ = None self.augmented_pair_transforms_ = [] self.augmented_pair_selected_features_ = [] self.augmented_pair_transforms_enabled_ = False self.augmented_pair_config_ = { "enabled": False, "reason": "missing_ig_scores", "max_features": int(self.augmented_pair_max_features), "budget": int(self.topK) if self.topK is not None and self.topK >= 0 else None, "num_candidates": 0, "num_retained": 0, } return pair_budget = None if bool(getattr(self, "topk_budget_strict", False)) else self.topK block = NativeAugmentedPairTransformBlock( max_features=self.augmented_pair_max_features, budget_topK=pair_budget, min_source_ig=self.G, ) block.fit( X_original, y, getattr(self, "ig_scores_", {}) or {}, getattr(self, "_bin_edges_", {}) or {}, self._numeric_feature_names_for_augmented_pairs(), budget_topK=pair_budget, min_source_ig=self.G, full_feature_names=list(getattr(self, "feature_names_in_", []) or []), ) self._augmented_pair_block_ = block self.augmented_pair_transforms_ = list(block.augmented_pair_transforms_) self.augmented_pair_selected_features_ = list(block.selected_ig_features_) self.augmented_pair_transforms_enabled_ = bool(self.augmented_pair_transforms_) self.augmented_pair_config_ = { "enabled": self.augmented_pair_transforms_enabled_, "max_features": int(self.augmented_pair_max_features), "budget": int(self.topK) if self.topK is not None and self.topK >= 0 else None, "budget_source": "global_strict_topK" if bool(getattr(self, "topk_budget_strict", False)) else "topK", "ops": ["product", "absolute_difference", "sum", "signed_difference"], "score": "adaptive_binned_ig", "min_source_ig": float( getattr(block, "min_source_ig_", max(1e-12, float(self.G or 0.0))) ), "num_candidates": int(getattr(block, "candidate_count_", 0)), "num_retained": len(self.augmented_pair_transforms_), } def _make_augmented_pair_features(self, X_original: Any, fit: bool = False): if self.L <= 1 or not bool(self.augmented_pair_transforms) or X_original is None: n_rows = 0 if X_original is None else len(X_original) return csr_matrix((n_rows, 0), dtype=np.float32) block = getattr(self, "_augmented_pair_block_", None) if block is None: return csr_matrix((len(X_original), 0), dtype=np.float32) return block.transform(X_original)
[docs] def get_augmented_pair_transforms(self) -> list[dict[str, Any]]: """Return augmented pair transforms used by the downstream estimator. Each catalog entry includes the raw pair formula, source-feature IG provenance, candidate coverage, unavailable-pair policy, and the standardization parameters used before the downstream estimator sees the feature. Candidate IG is scored on rows where both source values are observed. For selected features, rows where the pair value cannot be computed receive the pair feature's training reference value before standardization, yielding a neutral standardized value. """ return [dict(item) for item in getattr(self, "augmented_pair_transforms_", [])]
[docs] def get_augmented_pair_standardization(self) -> pd.DataFrame: """Return standardization metadata for augmented pair features. The returned columns are aligned to ``get_augmented_pair_transforms()`` and make the raw-to-estimator transformation explicit. """ rows: list[dict[str, Any]] = [] for item in self.get_augmented_pair_transforms(): rows.append( { "name": item.get("name"), "operation": item.get("operation"), "inputs": item.get("inputs"), "raw_formula": item.get("raw_formula", item.get("formula")), "standardization_mean": item.get("standardization_mean"), "standardization_scale": item.get("standardization_scale"), "standardized_formula": item.get("standardized_formula"), "reference_raw_value": item.get("reference_raw_value"), "pair_missing_policy": item.get("pair_missing_policy"), "eligible_count": item.get("eligible_count"), "eligible_rate": item.get("eligible_rate"), "missing_pair_rate": item.get("missing_pair_rate"), "source_observed_medians": item.get("source_observed_medians"), "transform_ig": item.get("transform_ig"), } ) return pd.DataFrame(rows)
@staticmethod def _format_source_observed_medians(source_observed_medians: Any) -> str: if not isinstance(source_observed_medians, dict) or not source_observed_medians: return "not available" parts: list[str] = [] for key, value in source_observed_medians.items(): try: parts.append(f"{key}={float(value):.6g}") except (TypeError, ValueError): parts.append(f"{key}={value}") return ", ".join(parts) @staticmethod def _augmented_pair_effect_text( *, raw_formula: str, operation: str, coefficient_raw_scale: float, standardization_mean: float, standardization_scale: float, source_observed_medians: Any, pair_missing_policy: str, eligible_rate: float, missing_pair_rate: float, ) -> dict[str, Any]: eligible_text = ( f"Candidate scoring used rows where both source values were observed" f" (eligible_rate={eligible_rate:.3g})." if np.isfinite(eligible_rate) else "Candidate scoring used rows where both source values were observed." ) missing_text = ( f"training rows where the pair was unavailable: {missing_pair_rate:.3g}." if np.isfinite(missing_pair_rate) else "the unavailable-pair rate is not available." ) reference_note = ( f"The reference raw value {standardization_mean:.6g} is the training-cohort mean " f"of the observed {raw_formula} pair term after applying the selected pair operation. " "It is not a domain-specific baseline." ) source_median_text = HUGIMLClassifierNative._format_source_observed_medians( source_observed_medians ) missing_policy_note = ( "If a selected pair cannot be computed for a row because one or both source values are missing, " "the augmented-pair feature is set to its training reference raw value before standardization. " "That gives the pair term a neutral standardized value of 0 for that row. " "This policy applies only to continuous augmented-pair features; HUGIML pattern features keep " "their native missing-value handling. " f"For diagnostics, source feature medians observed in training were: {source_median_text}." ) if not np.isfinite(coefficient_raw_scale): return { "decision_direction": "effect_not_available", "risk_increases_when": "not_available", "unit_effect_interpretation": "Raw-scale log-odds effect is not available for this downstream estimator.", "reference_raw_value_description": "training_cohort_mean_of_observed_raw_pair_value", "source_observed_medians_description": "per-source-feature observed medians for diagnostics only; not used to construct pair values", "pair_missing_policy_description": "unavailable pair values are set to the pair reference raw value before standardization", "raw_scale_note": f"{reference_note} {eligible_text}", "raw_interpretation": ( f"The downstream estimator uses ({raw_formula} - {standardization_mean:.6g}) " f"/ {standardization_scale:.6g}. {reference_note} {eligible_text} " f"For selected-feature construction, {missing_policy_note}" ), } if coefficient_raw_scale > 0: direction = "higher_raw_value_increases_score" direction_text = f"Higher {raw_formula} increases the model score." elif coefficient_raw_scale < 0: direction = "higher_raw_value_decreases_score" direction_text = f"Higher {raw_formula} decreases the model score." else: direction = "raw_value_has_zero_linear_effect" direction_text = f"Higher {raw_formula} does not change the linear model score." if operation == "absolute_difference": risk_when = ( "absolute_difference_increases" if coefficient_raw_scale > 0 else "absolute_difference_decreases" if coefficient_raw_scale < 0 else "not_applicable" ) unit_text = ( f"Each +1 increase in the absolute difference term changes the log-odds by " f"{coefficient_raw_scale:.6g}." ) raw_scale_note = ( "The raw-unit effect is expressed on the absolute-difference scale. " + reference_note ) elif operation == "product": risk_when = ( "product_value_increases" if coefficient_raw_scale > 0 else "product_value_decreases" if coefficient_raw_scale < 0 else "not_applicable" ) unit_text = ( f"A +1 change in the product term changes the log-odds by " f"{coefficient_raw_scale:.6g}. For a product feature, changing one source " "variable does not have a fixed marginal effect; it depends on the current " "value of the other source variable." ) raw_scale_note = ( "The raw-unit effect is expressed on the product-term scale, not as a fixed " "one-unit effect of either individual source feature. " + reference_note ) elif operation == "sum": risk_when = ( "sum_value_increases" if coefficient_raw_scale > 0 else "sum_value_decreases" if coefficient_raw_scale < 0 else "not_applicable" ) unit_text = ( f"Each +1 increase in the sum term changes the log-odds by " f"{coefficient_raw_scale:.6g}. The same coefficient applies to a one-unit " "increase in either source feature while the other source feature is kept constant." ) raw_scale_note = ( "The raw-unit effect is expressed on the pair sum scale. " + reference_note ) elif operation == "signed_difference": risk_when = ( "left_minus_right_increases" if coefficient_raw_scale > 0 else "left_minus_right_decreases" if coefficient_raw_scale < 0 else "not_applicable" ) unit_text = ( f"Each +1 increase in the signed difference term changes the log-odds by " f"{coefficient_raw_scale:.6g}. Increasing the left source feature raises this " "term, while increasing the right source feature lowers it." ) raw_scale_note = ( "The raw-unit effect is expressed on the signed left-minus-right difference scale. " + reference_note ) else: risk_when = ( "raw_value_increases" if coefficient_raw_scale > 0 else "raw_value_decreases" if coefficient_raw_scale < 0 else "not_applicable" ) unit_text = ( f"Each +1 raw-unit increase changes the log-odds by {coefficient_raw_scale:.6g}." ) raw_scale_note = reference_note return { "decision_direction": direction, "risk_increases_when": risk_when, "unit_effect_interpretation": unit_text, "reference_raw_value_description": "training_cohort_mean_of_observed_raw_pair_value", "source_observed_medians_description": "per-source-feature observed medians for diagnostics only; not used to construct pair values", "pair_missing_policy_description": "unavailable pair values are set to the pair reference raw value before standardization", "raw_scale_note": f"{raw_scale_note} {eligible_text}", "raw_interpretation": ( f"{direction_text} {unit_text} The downstream estimator uses " f"({raw_formula} - {standardization_mean:.6g}) / {standardization_scale:.6g}. " f"{reference_note} {eligible_text} For selected-feature construction, {missing_policy_note} " f"Among training rows, {missing_text}" ), } def _augmented_pair_effect_rows(self) -> list[dict[str, Any]]: """Return augmented-pair effect rows in raw and standardized units.""" check_is_fitted(self) try: imp = self.feature_importances() coef_lookup = dict(zip(imp["feature"], imp["coefficient"])) except AttributeError: coef_lookup = {} rows: list[dict[str, Any]] = [] for item in self.get_augmented_pair_transforms(): name = str(item.get("name")) feature = f"augmented_pair:{name}" coef_std = float(coef_lookup.get(feature, np.nan)) mean = float(item.get("standardization_mean", np.nan)) scale = float(item.get("standardization_scale", np.nan)) scale_safe = scale if np.isfinite(scale) and scale != 0.0 else np.nan coef_raw = ( coef_std / scale_safe if np.isfinite(coef_std) and np.isfinite(scale_safe) else np.nan ) operation = str(item.get("operation", "")) raw_formula = str(item.get("raw_formula", item.get("formula", name))) text = self._augmented_pair_effect_text( raw_formula=raw_formula, operation=operation, coefficient_raw_scale=coef_raw, standardization_mean=mean, standardization_scale=scale, source_observed_medians=item.get("source_observed_medians"), pair_missing_policy=str( item.get("pair_missing_policy", "reference_value_for_unavailable_pair") ), eligible_rate=float(item.get("eligible_rate", np.nan)), missing_pair_rate=float(item.get("missing_pair_rate", np.nan)), ) rows.append( { "feature": feature, "name": name, "operation": operation, "inputs": item.get("inputs"), "raw_formula": raw_formula, "standardized_formula": item.get("standardized_formula"), "standardization_mean": mean, "standardization_scale": scale, "reference_raw_value": mean, "reference_raw_value_description": text["reference_raw_value_description"], "coefficient_standardized": coef_std, "one_std_effect_on_log_odds": coef_std, "coefficient_raw_scale": coef_raw, "one_raw_unit_effect_on_log_odds": coef_raw, "decision_direction": text["decision_direction"], "risk_increases_when": text["risk_increases_when"], "unit_effect_interpretation": text["unit_effect_interpretation"], "raw_scale_note": text["raw_scale_note"], "raw_interpretation": text["raw_interpretation"], "pair_missing_policy": item.get("pair_missing_policy"), "pair_missing_policy_description": text["pair_missing_policy_description"], "eligible_count": item.get("eligible_count"), "eligible_rate": item.get("eligible_rate"), "missing_pair_rate": item.get("missing_pair_rate"), "source_observed_medians": item.get("source_observed_medians"), "source_observed_medians_description": text[ "source_observed_medians_description" ], "transform_ig": item.get("transform_ig"), } ) return rows
[docs] def explain_augmented_pair_effects(self) -> pd.DataFrame: """Explain augmented-pair effects in standardized and raw units. The downstream estimator is fit on standardized augmented-pair values. This method converts each standardized coefficient back to the raw pair scale and states that the reference value is the training-cohort mean of the observed pair term, not a domain-specific baseline. Candidate scoring uses rows where both source values are observed. For selected features, rows where the pair cannot be computed receive the pair feature's training reference raw value before standardization, yielding a neutral standardized value for that pair term. HUGIML pattern features keep their native missing-value handling. For logistic-regression downstream models, coefficient columns are log-odds effects. Product-term effects are expressed on the product scale; changing one individual input does not have a fixed marginal effect because it depends on the current value of the other input. """ return pd.DataFrame(self._augmented_pair_effect_rows())
[docs] def transform(self, X: Any) -> csr_matrix: """Return the binary HUG pattern matrix for X. Each column corresponds to one mined pattern. Entry (i, j) is 1 when all items of pattern j appear in row i. Parameters ---------- X : array-like or DataFrame Returns ------- csr_matrix, shape (n_samples, n_patterns) """ check_is_fitted(self) # ── v1.1.0 adaptive pre-binning ─────────────────────────────────── if getattr(self, "adaptive_binning", False) and getattr(self, "_bin_edges_", None): X = self._prebin_for_predict(X) # ───────────────────────────────────────────────────────────────── return self._build_test_hup(X)
def _build_test_hup(self, X_test: Any) -> csr_matrix: """Build the sparse binary pattern matrix for test data. This follows the original v1.1.x single-pass path. """ self._check_health() # In original_plus_patterns mode, a fitted model may legitimately have # zero mined patterns. Return an empty pattern matrix and let # _make_downstream_features use the original feature block. if ( len(getattr(self, "patterns_", [])) == 0 and getattr(self, "feature_mode", "patterns_only") != "patterns_only" ): return csr_matrix((len(X_test), 0), dtype=np.float32) # ── v1.1.0 non-finite handling ──────────────────────────────────── if not getattr(self, "adaptive_binning", False): X_test, _cat_mask = self._handle_test_nan(X_test) else: _cat_mask = getattr(self, "cat_cols_mask_", None) # ───────────────────────────────────────────────────────────────── self._validate_test_input(X_test) X_num, X_cat_raw = self._to_float_array(X_test, _cat_mask) try: X_num = check_array(X_num, dtype=None, ensure_all_finite=False) except TypeError: X_num = check_array(X_num, dtype=None, force_all_finite=False) n = X_num.shape[0] X_cat_arg = X_cat_raw if any(v is not None for v in X_cat_raw) else None # Single-pass path. Prefer native CSR output to avoid copying COO # row/column arrays into Python and then asking scipy to sort/compress # them again. build_test_matrix remains a compatibility fallback. n_pats = len(self.patterns_) if getattr(self, "_native_available_", True): try: if hasattr(_core, "build_test_matrix_csr"): indptr, indices = _core.build_test_matrix_csr( X_num, self.td_, X_cat_arg, self.patterns_, ) data = np.ones(len(indices), dtype=np.float32) return csr_matrix((data, indices, indptr), shape=(n, n_pats), dtype=np.float32) rows, cols = _core.build_test_matrix( X_num, self.td_, X_cat_arg, self.patterns_, ) data = np.ones(len(rows), dtype=np.float32) return csr_matrix((data, (rows, cols)), shape=(n, n_pats), dtype=np.float32) except Exception: logger.debug( "Native build_test_matrix failed; falling back to Python path.", exc_info=True, ) return self._build_test_hup_fallback(X_num, X_cat_raw, n, n_pats) def _build_test_hup_fallback( self, X_num: np.ndarray, X_cat_raw: list, n: int, n_pats: int, ) -> csr_matrix: """Pure-Python fallback for deserialized models without C++ extension.""" td = self.td_ p = X_num.shape[1] cpp_bn2id = td._cpp_bn2id cpp_stride = td._cpp_bkey_stride cpp_all_edges = td._cpp_all_edges cpp_nb_col = td._cpp_nb_col cpp_col_min = td._cpp_col_min cpp_col_range = td._cpp_col_range cpp_is_cat = td._cpp_is_cat cpp_is_int = td._cpp_is_int cpp_is_precoded = getattr(td, "_cpp_is_precoded", []) # If the wrapper was deserialized without _cpp_is_precoded, reconstruct it # from the classifier's compact adaptive precoded feature set. if not cpp_is_precoded: precoded_features = getattr(self, "_adaptive_precoded_features_", None) if precoded_features is None and getattr(self, "_adaptive_code_label_map_", {}): precoded_features = set(getattr(self, "_bin_edges_", {})) if precoded_features: feat_names = getattr(self, "feature_names_in_", None) or [] cpp_is_precoded = [name in precoded_features for name in feat_names] cpp_cat_cats = td._cpp_cat_categories label2code: list[dict[object, int] | None] = [None] * p for j in range(p): if j < len(cpp_is_cat) and cpp_is_cat[j]: if j < len(cpp_cat_cats) and cpp_cat_cats[j]: label2code[j] = {v: i for i, v in enumerate(cpp_cat_cats[j])} def bkey(bi: int, j: int) -> int: return int(bi * cpp_stride + j) test_trans_sets = [] for r in range(n): items: set = set() for j in range(p): if j < len(cpp_is_cat) and cpp_is_cat[j]: if X_cat_raw[j] is None: continue v = X_cat_raw[j][r] if v is None or (isinstance(v, float) and math.isnan(v)): continue _lc = label2code[j] if _lc is None: continue lc: dict[object, int] = _lc code = lc.get(v) if code is None: continue bi = code + 1 elif j < len(cpp_is_precoded) and cpp_is_precoded[j]: # Pre-coded column: X_num[r, j] IS the 0-indexed bin code. # bi = code + 1 directly — no scaling, no upper_bound. code_val = X_num[r, j] if not math.isfinite(code_val): continue nb = cpp_nb_col[j] if j < len(cpp_nb_col) else 0 bi = max(1, min(int(code_val) + 1, nb)) else: edges = cpp_all_edges[j] if edges is None or len(edges) < 2: continue nb = cpp_nb_col[j] raw = X_num[r, j] # Non-finite values must generate no item — same contract # as every other column type and the C++ build_test_matrix. if not math.isfinite(raw): continue if j < len(cpp_is_int) and cpp_is_int[j]: val = raw else: val = (raw - cpp_col_min[j]) / cpp_col_range[j] inner = edges[1:-1] if isinstance(edges, np.ndarray) else np.array(edges[1:-1]) bi = int(np.searchsorted(inner, val, side="right")) + 1 bi = max(1, min(bi, nb)) bk = bkey(bi, j) iid = cpp_bn2id.get(bk) if iid is not None: items.add(iid) test_trans_sets.append(frozenset(items)) rows_v, cols_v = [], [] for pi, pe in enumerate(self.patterns_): pat_items = frozenset(pe.items) for tid, ts in enumerate(test_trans_sets): if pat_items.issubset(ts): rows_v.append(tid) cols_v.append(pi) data = np.ones(len(rows_v), dtype=np.float32) return csr_matrix((data, (rows_v, cols_v)), shape=(n, n_pats), dtype=np.float32) def _check_health(self) -> None: check_is_fitted(self) if not hasattr(self, "patterns_"): raise HUGIMLPredictionError("Pattern state missing — fit() may have failed.") if ( len(self.patterns_) == 0 and getattr(self, "feature_mode", "patterns_only") == "patterns_only" ): raise HUGIMLPredictionError("Model has no patterns — fit() may have failed.") if not hasattr(self, "model_"): raise HUGIMLPredictionError("Downstream model missing — fit() incomplete.") if not hasattr(self, "td_") or self.td_ is None: raise HUGIMLPredictionError("Transaction data missing — model state corrupt.") def _validate_test_input(self, X_test: Any) -> None: """Validate test-time input against training schema.""" from scipy.sparse import issparse as _issparse if _issparse(X_test): raise ValueError( "HUGIMLClassifierNative does not support sparse input. " "Convert to a dense array via X.toarray() first." ) is_df = isinstance(X_test, pd.DataFrame) arr = None if not is_df: arr = np.asarray(X_test) if arr.ndim == 1: raise ValueError( f"HUGIMLClassifierNative expects a 2D array, got 1D array of shape {arr.shape}." ) n_test_features = ( len(X_test.columns) if is_df else arr.shape[1] # type: ignore[union-attr] ) expected = getattr(self, "n_features_in_", None) if expected is not None and n_test_features != expected: raise HUGIMLSchemaError( f"X has {n_test_features} features, but the model was fitted " f"with {expected} features." ) expected_names = getattr(self, "feature_names_in_", None) if is_df and expected_names is not None: test_names = [str(c) for c in X_test.columns] if test_names != expected_names: missing = set(expected_names) - set(test_names) extra = set(test_names) - set(expected_names) parts = [] if missing: parts.append(f"missing: {sorted(missing)}") if extra: parts.append(f"unexpected: {sorted(extra)}") if not missing and not extra: parts.append("columns in different order") raise HUGIMLSchemaError( "Column mismatch between training and test data. " + "; ".join(parts) ) cat_mask = getattr(self, "cat_cols_mask_", None) if is_df and cat_mask is not None and np.any(cat_mask): for j, is_cat in enumerate(cat_mask): if j >= n_test_features: break col = X_test.iloc[:, j] if is_cat and pd.api.types.is_numeric_dtype(col): warnings.warn( f"Column '{X_test.columns[j]}' was categorical during " f"training but has numeric dtype ({col.dtype}) in test data.", HUGIMLDtypeDriftWarning, stacklevel=4, ) if is_df and cat_mask is not None: td = self.td_ cpp_all_edges = getattr(td, "_cpp_all_edges", None) if cpp_all_edges is not None: try: numeric_idx = [ j for j in range(min(n_test_features, len(cat_mask), len(cpp_all_edges))) if not cat_mask[j] and cpp_all_edges[j] is not None and len(cpp_all_edges[j]) >= 2 ] if numeric_idx: train_min = np.asarray( [float(cpp_all_edges[j][0]) for j in numeric_idx], dtype=float ) train_max = np.asarray( [float(cpp_all_edges[j][-1]) for j in numeric_idx], dtype=float ) cpp_col_min = getattr(td, "_cpp_col_min", None) cpp_col_range = getattr(td, "_cpp_col_range", None) if cpp_col_min is not None and cpp_col_range is not None: cm = np.asarray( [float(cpp_col_min[j]) for j in numeric_idx], dtype=float ) cr = np.asarray( [float(cpp_col_range[j]) for j in numeric_idx], dtype=float ) ok = np.isfinite(cr) & (cr > 0.0) & np.isfinite(cm) train_min[ok] = train_min[ok] * cr[ok] + cm[ok] train_max[ok] = train_max[ok] * cr[ok] + cm[ok] train_span = train_max - train_min valid = train_span > 0 if np.any(valid): vals = X_test.iloc[:, numeric_idx].to_numpy( dtype=np.float64, copy=False ) test_min = np.nanmin(np.where(np.isfinite(vals), vals, np.nan), axis=0) test_max = np.nanmax(np.where(np.isfinite(vals), vals, np.nan), axis=0) drift = ( valid & np.isfinite(test_min) & np.isfinite(test_max) & ( (test_min < train_min - train_span * 0.5) | (test_max > train_max + train_span * 0.5) ) ) for pos in np.flatnonzero(drift): j = numeric_idx[int(pos)] warnings.warn( f"Column '{X_test.columns[j]}' has values " f"[{float(test_min[pos]):.4g}, {float(test_max[pos]):.4g}] outside training " f"range [{float(train_min[pos]):.4g}, {float(train_max[pos]):.4g}].", HUGIMLRangeWarning, stacklevel=4, ) except Exception: # Preserve prediction behaviour if warning-only drift checks # cannot be vectorized for mixed/object inputs. pass # ── Monitoring and drift ──────────────────────────────────────────────────
[docs] def enable_monitoring(self, window_size: int = 1000) -> HUGIMLClassifierNative: """Enable prediction monitoring. Access via ``self.monitor``.""" self.monitor = PredictionMonitor(window_size=window_size) return self
[docs] def disable_monitoring(self) -> HUGIMLClassifierNative: """Disable prediction monitoring.""" self.monitor = None return self
[docs] def detect_drift( self, X_test: Any, y_test: np.ndarray | None = None, threshold: float = 0.1, ) -> str: """Run multi-method drift detection and return a human-readable report. Uses PSI + KL divergence. When ``y_test`` is provided, also checks label distribution drift. Notes ----- Drift metrics are computed on the numeric array retained by the mining path. Fixed-B numeric columns that contained NaN/Inf during training are converted to the categorical bin-label path so missingness is handled consistently at fit/predict time; those columns are therefore not represented as continuous numeric drift baselines. PSI/KL alerts for such columns should be interpreted through pattern/feature-importance diagnostics rather than through ``detect_drift()``. Parameters ---------- X_test : array-like or DataFrame y_test : array-like, optional threshold : float Returns ------- str """ check_is_fitted(self) if getattr(self, "_drift_det", None) is None: if self._is_production_mode(): raise RuntimeError(self._audit_artifact_message("Drift-detection baseline")) return "Drift detection unavailable (no baseline stored)." cat_mask = getattr(self, "cat_cols_mask_", np.zeros(0, dtype=bool)) X_num, _ = self._to_float_array(X_test, cat_mask) y_arr = np.asarray(y_test) if y_test is not None else None report = self._drift_det.detect(X_num, y_test=y_arr, threshold=threshold) return str(report)
[docs] def get_drift_psi(self, X_test: Any) -> dict: """Return per-feature PSI values as a dict. See ``detect_drift()`` for the fixed-B missing-numeric limitation: columns that were routed to categorical bin labels because they contained NaN/Inf during training do not have meaningful continuous PSI baselines. """ check_is_fitted(self) if getattr(self, "_drift_det", None) is None: if self._is_production_mode(): raise RuntimeError(self._audit_artifact_message("Drift PSI baseline")) return {} cat_mask = getattr(self, "cat_cols_mask_", np.zeros(0, dtype=bool)) X_num, _ = self._to_float_array(X_test, cat_mask) return self._drift_det.compute_psi(X_num)
[docs] def cross_validate_monitored( self, X: Any, y: Any, cv: Any = None, scoring: str = "roc_auc", ) -> dict: """Cross-validation with per-fold monitoring and drift detection. Parameters ---------- X : pd.DataFrame or ndarray y : array-like cv : int or CV splitter (default: StratifiedKFold(5)) scoring : str Returns ------- dict with keys: test_scores, fit_times_ms, fold_monitors, fold_drift, fold_metadata """ from sklearn.metrics import get_scorer from sklearn.model_selection import StratifiedKFold y = np.asarray(y) if cv is None: cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42) elif isinstance(cv, int): cv = StratifiedKFold(n_splits=cv, shuffle=True, random_state=42) scorer = get_scorer(scoring) results: dict = { "test_scores": [], "fit_times_ms": [], "fold_monitors": [], "fold_drift": [], "fold_metadata": [], } # parallelise CV folds. # Each fold is fully independent (separate clone, separate fit). # prefer="threads" avoids pickling the classifier and its C++ state; # the GIL is released during C++ mining so threads still scale. # Falls back to serial when n_jobs=1 (the default). base_params = { k: v for k, v in self.get_params().items() if k not in ("allCols", "origColumns") } clf_cls = self.__class__ def _fit_fold(train_idx, test_idx): if isinstance(X, pd.DataFrame): X_tr, X_te = X.iloc[train_idx], X.iloc[test_idx] else: X_tr, X_te = X[train_idx], X[test_idx] y_tr, y_te = y[train_idx], y[test_idx] fold_clf = clf_cls(**base_params) t0 = time.perf_counter() fold_clf.fit(X_tr, y_tr) fit_ms = (time.perf_counter() - t0) * 1000 score = scorer(fold_clf, X_te, y_te) fold_clf.enable_monitoring() fold_clf.predict_proba(X_te) fold_mon = fold_clf.monitor mon_stats = fold_mon.stats if fold_mon is not None else {} psi: dict = {} if getattr(fold_clf, "_drift_det", None) is not None: cat_mask = getattr(fold_clf, "cat_cols_mask_", np.zeros(0, dtype=bool)) X_te_num, _ = fold_clf._to_float_array(X_te, cat_mask) psi = fold_clf._drift_det.compute_psi(X_te_num) return score, fit_ms, fold_clf.fit_metadata_, mon_stats, psi try: from joblib import Parallel from joblib import delayed as _delayed _n_jobs = self.n_jobs if hasattr(self, "n_jobs") else 1 fold_outputs = Parallel(n_jobs=_n_jobs, prefer="threads")( _delayed(_fit_fold)(tr, te) for tr, te in cv.split(X, y) ) except Exception: fold_outputs = [_fit_fold(tr, te) for tr, te in cv.split(X, y)] for score, fit_ms, metadata, mon_stats, psi in fold_outputs: results["test_scores"].append(score) results["fit_times_ms"].append(fit_ms) results["fold_metadata"].append(metadata) results["fold_monitors"].append(mon_stats) results["fold_drift"].append(psi) return results
# ── Explanation methods ───────────────────────────────────────────────────
[docs] def get_hug_features(self) -> list[str]: """Return a human-readable label for each mined HUG pattern. Singleton patterns use the format ``feature=[lo,hi)`` for adaptive numerical columns (e.g. ``age=[35,50)``) and ``feature=value`` for categorical columns (e.g. ``gender=F``). Compound patterns (L > 1) are comma-separated, e.g. ``age=[35,50), gender=F``. When ``adaptive_binning=True`` and the integer-code path was used, C++ stores bin labels as ``feature=[k,k+1]`` (integer range). These are transparently remapped to the original-scale ``[lo,hi)`` labels via ``_adaptive_code_label_map_`` so that the output is identical in appearance to the string-path output. Production mode --------------- This method remains available in ``execution_mode='production'`` because it only needs retained pattern labels. ``get_pattern_info()`` is intentionally audit-only because it additionally needs the retained training pattern matrix to compute support. Returns ------- list of str """ check_is_fitted(self) item_map = self.td_.item_map # _adaptive_code_label_map_ is set by _apply_adaptive_binning when the # integer-code path is used. Empty dict (default) → no remapping needed. label_remap = getattr(self, "_adaptive_code_label_map_", {}) def _resolve_item(item_id: int) -> str: raw_label = item_map.get(item_id, str(item_id)) return label_remap.get(raw_label, raw_label) return [", ".join(_resolve_item(it) for it in pe.items) for pe in self.patterns_]
[docs] def get_transformed_shape(self) -> tuple[int, int]: """Return (n_samples, n_patterns) for the training pattern matrix. In production mode the matrix itself is not retained, but its shape is persisted as lightweight diagnostic metadata. """ check_is_fitted(self) if hasattr(self, "x_train_hup_"): shape = self.x_train_hup_.shape return int(shape[0]), int(shape[1]) cached = getattr(self, "_training_pattern_matrix_shape_", None) if cached is not None: return int(cached[0]), int(cached[1]) raise RuntimeError(self._audit_artifact_message("Training pattern matrix shape"))
[docs] def get_pattern_info(self) -> pd.DataFrame: """Summary DataFrame with one row per mined HUG pattern. Columns: pattern, utility, information_gain, support. This is an audit/governance table. Unlike ``get_hug_features()``, it requires the retained training pattern matrix to compute support and therefore raises a clear error in ``execution_mode='production'``. """ check_is_fitted(self) self._require_audit_artifact("Pattern support and pattern-info audit table", "x_train_hup_") n_train = self.x_train_hup_.shape[0] features = self.get_hug_features() records: list[dict[str, object]] = [] for i, pe in enumerate(self.patterns_): support = float(self.x_train_hup_[:, i].sum()) / n_train records.append( { "pattern": features[i], "utility": round(pe.utility, 6), "information_gain": round(pe.ig, 6), "support": round(support, 4), } ) return pd.DataFrame(records)
def _downstream_feature_display_name(self, name: str) -> str: """Return a compact display label for a downstream feature name.""" for prefix in ("orig:", "pattern:", "augmented_pair:"): if name.startswith(prefix): return name[len(prefix) :] return name def _downstream_feature_type(self, name: str) -> str: """Classify a downstream feature name by its explicit namespace.""" if name.startswith("orig:"): return "original" if name.startswith("augmented_pair:"): return "augmented_pair" return "pattern" def _pattern_support_lookup(self) -> dict[str, float]: """Return training support by both raw and namespaced pattern label. Production-mode models intentionally drop the training HUG matrix. ``feature_importances()`` should prefer cached support metadata when it exists; this lookup is only a best-effort recomputation path for audit models or legacy objects that still retain ``x_train_hup_``. """ if not hasattr(self, "x_train_hup_"): return {} n_train = int(self.x_train_hup_.shape[0]) if n_train <= 0: return {} labels = self.get_hug_features() support = np.asarray(self.x_train_hup_.sum(axis=0)).ravel() / max(n_train, 1) lookup: dict[str, float] = {} for label, value in zip(labels, support): val = float(value) lookup[label] = val lookup[f"pattern:{label}"] = val return lookup
[docs] def get_downstream_features(self) -> list[str]: """Return names aligned with the downstream estimator input columns. The returned names include a namespace prefix so feature provenance is explicit: ``orig:`` for original features, ``pattern:`` for mined HUG patterns, and ``augmented_pair:`` for augmented pair transforms. When ``topk_budget_strict=True``, the returned list is already filtered to the columns retained by the fitted strict TopK mask. """ check_is_fitted(self) return list(self._get_downstream_feature_names())
def _downstream_feature_counts(self) -> dict[str, int]: """Return counts by downstream feature family for the fitted estimator.""" names = list( getattr(self, "_downstream_feature_names_", []) or self._get_downstream_feature_names() ) counts = { "original": sum(1 for name in names if str(name).startswith("orig:")), "pattern": sum(1 for name in names if str(name).startswith("pattern:")), "augmented_pair": sum(1 for name in names if str(name).startswith("augmented_pair:")), } counts["total"] = len(names) return counts
[docs] def get_model_composition(self) -> dict[str, Any]: """Return downstream feature composition and relevant fitted configuration. The composition describes the actual feature families entering the downstream estimator after feature-mode construction and optional strict TopK filtering. """ check_is_fitted(self) counts = self._downstream_feature_counts() aug_config = dict(getattr(self, "augmented_pair_config_", {}) or {}) return { "feature_mode": getattr(self, "feature_mode", "patterns_only"), "topK": getattr(self, "topK", None), "topk_budget_strict": bool(getattr(self, "topk_budget_strict", False)), "augmented_pair_transforms_enabled": bool( getattr(self, "augmented_pair_transforms", False) ), "augmented_pair_config": aug_config, "n_input_features": int(getattr(self, "n_features_in_", 0)), "n_patterns_mined": int(len(getattr(self, "patterns_", []))), "n_downstream_features": counts["total"], "downstream_feature_counts": counts, }
def _cache_downstream_feature_metadata(self) -> None: """Cache metadata aligned with the fitted downstream feature matrix. This method must run before production retention because production mode drops the training matrices. It is intentionally best-effort: pattern support can be unavailable for cached/tuned candidates, but non-missing-rate and variance can still be computed from the fitted downstream matrix before it is discarded. """ features = self._get_downstream_feature_names() self._downstream_feature_names_ = list(features) n_features = len(features) self._downstream_pattern_support_ = np.full(n_features, np.nan, dtype=np.float64) try: support_lookup = self._pattern_support_lookup() except Exception: support_lookup = {} for idx, feat in enumerate(features): if self._downstream_feature_type(feat) == "pattern": display_name = self._downstream_feature_display_name(feat) self._downstream_pattern_support_[idx] = support_lookup.get( feat, support_lookup.get(display_name, np.nan) ) X_meta = getattr(self, "x_train_downstream_", None) if X_meta is not None and n_features == getattr(X_meta, "shape", (0, 0))[1]: X_arr = X_meta.toarray() if issparse(X_meta) else np.asarray(X_meta) finite_mask = np.isfinite(X_arr) self._downstream_non_missing_rate_ = finite_mask.mean(axis=0).astype(np.float64) self._downstream_variance_ = np.nanvar( np.where(finite_mask, X_arr, np.nan), axis=0 ).astype(np.float64) else: self._downstream_non_missing_rate_ = np.full(n_features, np.nan, dtype=np.float64) self._downstream_variance_ = np.full(n_features, np.nan, dtype=np.float64)
[docs] def feature_importances(self) -> pd.DataFrame: """Map downstream estimator coefficients to final feature names. Returns a DataFrame sorted by absolute coefficient magnitude. Feature names are aligned to the downstream estimator after feature-mode and optional strict TopK filtering have been applied. The ``feature_type`` column distinguishes original features, mined HUG patterns, and augmented pair transforms. ``pattern_support`` is populated only for mined HUG patterns; original and augmented-pair features use ``support_type='not_applicable'`` and ``pattern_support=NaN``. Raises ------ AttributeError When the downstream estimator does not expose ``coef_`` (e.g. non-linear models). """ check_is_fitted(self) production_without_training_artifacts = self._is_production_mode() and not hasattr( self, "x_train_downstream_" ) audit_note = ( self._audit_artifact_message("Training matrices and drift-baseline audit artifacts") if production_without_training_artifacts else "" ) if production_without_training_artifacts: warnings.warn( audit_note, HUGIMLWarning, stacklevel=2, ) clf_step = self.model_.named_steps.get("clf") if not hasattr(clf_step, "coef_"): raise AttributeError( "feature_importances requires the downstream estimator " "to expose coef_ (e.g. LogisticRegression)." ) raw_coef = clf_step.coef_ coef = ( raw_coef.mean(axis=0) if raw_coef.ndim == 2 and raw_coef.shape[0] > 1 else raw_coef.ravel() ) features = self._get_downstream_feature_names() if len(features) != len(coef): raise RuntimeError( "Downstream feature names are not aligned with estimator coefficients: " f"{len(features)} names for {len(coef)} coefficients." ) cached_pattern_support = getattr(self, "_downstream_pattern_support_", None) if cached_pattern_support is not None and len(cached_pattern_support) != len(features): cached_pattern_support = None support_lookup = self._pattern_support_lookup() if cached_pattern_support is None else {} strict_scores = getattr(self, "_strict_topk_feature_scores_", None) strict_score_lookup: dict[str, float] = {} if strict_scores is not None: full_names = getattr(self, "_downstream_feature_names_full_", None) if full_names is None or len(full_names) == 0: full_names = self._get_downstream_feature_names_full() strict_score_lookup = { name: float(score) for name, score in zip(full_names, np.asarray(strict_scores).ravel()) } aug_lookup = { f"augmented_pair:{item.get('name')}": item for item in getattr(self, "augmented_pair_transforms_", []) } non_missing_rates = getattr(self, "_downstream_non_missing_rate_", None) variances = getattr(self, "_downstream_variance_", None) if ( non_missing_rates is None or variances is None or len(non_missing_rates) != len(features) or len(variances) != len(features) ): X_meta = getattr(self, "x_train_downstream_", None) if X_meta is not None and len(features) == getattr(X_meta, "shape", (0, 0))[1]: X_arr = X_meta.toarray() if issparse(X_meta) else np.asarray(X_meta) finite_mask = np.isfinite(X_arr) non_missing_rates = finite_mask.mean(axis=0) variances = np.nanvar(np.where(finite_mask, X_arr, np.nan), axis=0) else: non_missing_rates = np.full(len(features), np.nan) variances = np.full(len(features), np.nan) rows: list[dict[str, object]] = [] for idx, (feat, c) in enumerate(zip(features, coef)): feature_type = self._downstream_feature_type(feat) display_name = self._downstream_feature_display_name(feat) if feature_type == "pattern": if cached_pattern_support is not None: pattern_support = float(cached_pattern_support[idx]) else: pattern_support = support_lookup.get( feat, support_lookup.get(display_name, np.nan) ) support_type = "pattern_support" else: pattern_support = np.nan support_type = "not_applicable" support_value = ( round(float(pattern_support), 4) if np.isfinite(pattern_support) else np.nan ) aug_meta = aug_lookup.get(feat, {}) if feature_type == "augmented_pair" else {} std_mean = aug_meta.get("standardization_mean", np.nan) std_scale = aug_meta.get("standardization_scale", np.nan) std_scale_float = float(std_scale) if np.isfinite(std_scale) else np.nan coef_raw = ( float(c) / std_scale_float if feature_type == "augmented_pair" and np.isfinite(std_scale_float) and std_scale_float != 0.0 else np.nan ) raw_formula = aug_meta.get("raw_formula", np.nan) if feature_type == "augmented_pair": aug_text = self._augmented_pair_effect_text( raw_formula=str(raw_formula), operation=str(aug_meta.get("operation", "")), coefficient_raw_scale=coef_raw, standardization_mean=float(std_mean) if np.isfinite(std_mean) else np.nan, standardization_scale=std_scale_float, source_observed_medians=aug_meta.get("source_observed_medians", np.nan), pair_missing_policy=str( aug_meta.get("pair_missing_policy", "reference_value_for_unavailable_pair") ), eligible_rate=float(aug_meta.get("eligible_rate", np.nan)), missing_pair_rate=float(aug_meta.get("missing_pair_rate", np.nan)), ) decision_direction = aug_text["decision_direction"] risk_increases_when = aug_text["risk_increases_when"] unit_effect_interpretation = aug_text["unit_effect_interpretation"] reference_raw_value_description = aug_text["reference_raw_value_description"] source_observed_medians_description = aug_text[ "source_observed_medians_description" ] pair_missing_policy_description = aug_text["pair_missing_policy_description"] raw_scale_note = aug_text["raw_scale_note"] raw_interpretation = aug_text["raw_interpretation"] else: decision_direction = np.nan risk_increases_when = np.nan unit_effect_interpretation = np.nan reference_raw_value_description = np.nan source_observed_medians_description = np.nan pair_missing_policy_description = np.nan raw_scale_note = np.nan raw_interpretation = np.nan rows.append( { "pattern": display_name, "feature": feat, "display_name": display_name, "feature_type": feature_type, "coefficient": round(float(c), 6), "abs_coefficient": round(abs(float(c)), 6), "pattern_support": support_value, "support": support_value, "support_type": support_type, "non_missing_rate": round(float(non_missing_rates[idx]), 6), "variance": round(float(variances[idx]), 6), "strict_topk_score": round(float(strict_score_lookup.get(feat, np.nan)), 6), "standardization_mean": std_mean, "standardization_scale": std_scale, "operation": aug_meta.get("operation", np.nan), "inputs": aug_meta.get("inputs", np.nan), "raw_formula": raw_formula, "standardized_formula": aug_meta.get("standardized_formula", np.nan), "pair_missing_policy": aug_meta.get("pair_missing_policy", np.nan), "eligible_count": aug_meta.get("eligible_count", np.nan), "eligible_rate": aug_meta.get("eligible_rate", np.nan), "missing_pair_rate": aug_meta.get("missing_pair_rate", np.nan), "source_observed_medians": aug_meta.get("source_observed_medians", np.nan), "transform_ig": aug_meta.get("transform_ig", np.nan), "coefficient_standardized": round(float(c), 6) if feature_type == "augmented_pair" else np.nan, "one_std_effect_on_log_odds": round(float(c), 6) if feature_type == "augmented_pair" else np.nan, "coefficient_raw_scale": round(float(coef_raw), 12) if np.isfinite(coef_raw) else np.nan, "one_raw_unit_effect_on_log_odds": round(float(coef_raw), 12) if np.isfinite(coef_raw) else np.nan, "reference_raw_value": std_mean if feature_type == "augmented_pair" else np.nan, "reference_raw_value_description": reference_raw_value_description, "decision_direction": decision_direction, "risk_increases_when": risk_increases_when, "unit_effect_interpretation": unit_effect_interpretation, "raw_scale_note": raw_scale_note, "raw_interpretation": raw_interpretation, "source_observed_medians_description": source_observed_medians_description, "pair_missing_policy_description": pair_missing_policy_description, "audit_note": (audit_note if production_without_training_artifacts else ""), } ) result = ( pd.DataFrame(rows) .sort_values("abs_coefficient", ascending=False) .reset_index(drop=True) ) if production_without_training_artifacts: result.attrs["audit_note"] = audit_note return pd.DataFrame(result)
# ── v1.1.0 Adaptive-binning diagnostic plots ───────────────────────────── # These methods are available on any fitted HUGIMLClassifierNative instance # when adaptive_binning=True. HUGIMLAdaptive inherits them automatically # as a subclass. Both require matplotlib (optional dependency).
[docs] def plot_bin_profiles(self, figsize: tuple | None = None): """Bar chart of the chosen B per numerical feature (adaptive binning only). Colour encodes position in the candidate range: blue = coarse end, green = mid, amber/red = fine end. Returns ------- (fig, ax) Raises ------ RuntimeError When called on a non-adaptive or unfitted model. ImportError When matplotlib is not installed. """ self._check_adaptive_fitted("plot_bin_profiles") self._require_mpl() import matplotlib.pyplot as plt feats = list(self.per_feature_b_.keys()) bvals = [self.per_feature_b_[f] for f in feats] cands = self.b_candidates or [2, 15] lo, hi = min(cands), max(cands) colors = [ "#2166ac" if b <= lo + (hi - lo) / 3 else "#1a9641" if b <= lo + 2 * (hi - lo) / 3 else "#d7191c" for b in bvals ] fig, ax = plt.subplots(figsize=figsize or (max(7, len(feats) * 0.5 + 2), 4)) ax.bar(range(len(feats)), bvals, color=colors, edgecolor="white", linewidth=0.5) ax.set_xticks(range(len(feats))) ax.set_xticklabels(feats, rotation=45, ha="right", fontsize=8) ax.set_ylabel("Chosen B_j", fontsize=10) ax.set_title( f"Adaptive binning — chosen B per feature " f"(threshold={self.min_marginal_gain_ratio:.0%})", fontsize=11, ) for i, b in enumerate(bvals): ax.text(i, b + 0.05, str(b), ha="center", fontsize=8) fig.tight_layout() return fig, ax
[docs] def ig_heatmap(self, figsize: tuple | None = None): """Heatmap of IG score at every (feature, B) grid point (adaptive binning only). The chosen B per feature is highlighted with a bounding box. Returns ------- (fig, ax) Raises ------ RuntimeError When called on a non-adaptive or unfitted model, or when ``ig_scores_`` is empty. ImportError When matplotlib is not installed. """ self._check_adaptive_fitted("ig_heatmap") if not getattr(self, "ig_scores_", None): raise RuntimeError("ig_scores_ is empty — call fit() first.") self._require_mpl() import matplotlib.pyplot as plt feats = sorted(self.ig_scores_) bs = sorted({b for sc in self.ig_scores_.values() for b in sc}) grid = np.array([[self.ig_scores_[f].get(b, 0.0) for b in bs] for f in feats]) fig, ax = plt.subplots( figsize=figsize or (max(6, len(bs) * 0.9), max(4, len(feats) * 0.45)) ) im = ax.imshow(grid, aspect="auto", cmap="YlOrRd") ax.set_xticks(range(len(bs))) ax.set_xticklabels([str(b) for b in bs], fontsize=9) ax.set_yticks(range(len(feats))) ax.set_yticklabels(feats, fontsize=8) ax.set_xlabel("B candidates", fontsize=10) ax.set_title("IG score per (feature, B) — box = chosen B", fontsize=11) for i, f in enumerate(feats): chosen = self.per_feature_b_.get(f) if chosen and chosen in bs: j = bs.index(chosen) ax.add_patch( plt.Rectangle( (j - 0.5, i - 0.5), 1, 1, fill=False, edgecolor="black", linewidth=2, ) ) plt.colorbar(im, ax=ax, label="Information gain") fig.tight_layout() return fig, ax
def _check_adaptive_fitted(self, method_name: str) -> None: """Raise a clear error when an adaptive-only method is called incorrectly.""" check_is_fitted(self) if not getattr(self, "adaptive_binning", False): raise RuntimeError( f"{method_name}() is only available when adaptive_binning=True. " f"Re-fit with HUGIMLClassifier(adaptive_binning=True, ...) " f"or use HUGIMLAdaptive." ) if not getattr(self, "per_feature_b_", None): raise RuntimeError(f"{method_name}() requires per_feature_b_ — call fit() first.") @staticmethod def _require_mpl() -> None: """Raise ImportError when matplotlib is not installed.""" try: import matplotlib # noqa: F401 except ImportError: raise ImportError( "matplotlib is required for diagnostic plots. " "Install with: pip install matplotlib " "or: pip install 'hugiml-core[plots]'" ) # ── End v1.1.0 adaptive-binning diagnostic plots ────────────────────────── def _summary_shape_text(self, matrix_attr: str, cached_shape_attr: str) -> str: """Return a stable summary shape for audit or production-retained models.""" matrix = getattr(self, matrix_attr, None) if matrix is not None and hasattr(matrix, "shape"): return str(tuple(int(v) for v in matrix.shape)) cached = getattr(self, cached_shape_attr, None) if cached is not None: return ( f"{tuple(int(v) for v in cached)} (training matrix not retained in production mode)" ) if self._is_production_mode(): return "not retained in production mode" return "unavailable"
[docs] def model_summary(self) -> str: """Human-readable model summary including top patterns.""" check_is_fitted(self) composition = self.get_model_composition() counts = composition.get("downstream_feature_counts", {}) lines = [ "HUGIMLClassifier — Model Summary", "=" * 50, f"Config: B={self.B}, L={self.L}, G={self.G}", f"Feature mode: {getattr(self, 'feature_mode', 'patterns_only')}", f"Training: {self.fit_metadata_.n_samples} samples, " f"{self.fit_metadata_.n_features} features, " f"{self.fit_metadata_.n_classes} classes", f"Patterns: {self.fit_metadata_.n_patterns} " f"({self.fit_metadata_.n_compound} compound)", f"Augmented pairs: {counts.get('augmented_pair', 0)} retained", f"Downstream composition: original={counts.get('original', 0)}, " f"patterns={counts.get('pattern', 0)}, " f"augmented_pair={counts.get('augmented_pair', 0)}, " f"total={counts.get('total', 0)}", f"Matrix: {self._summary_shape_text('x_train_hup_', '_training_pattern_matrix_shape_')} " f"(density={self.fit_metadata_.matrix_density:.4f})", f"Downstream: {self._summary_shape_text('x_train_downstream_', '_training_downstream_matrix_shape_')}", f"Fit time: {self.fit_metadata_.total_fit_ms:.0f} ms", "", "Stage breakdown (ms):", ] for stage, ms in self.fit_metadata_.stage_times_ms.items(): lines.append(f" {stage:<25} {ms:>8.1f}") try: imp = self.feature_importances().head(10) has_non_pattern = bool((imp.get("feature_type", "pattern") != "pattern").any()) has_augmented = bool((imp.get("feature_type", "pattern") == "augmented_pair").any()) section = ( "Top 10 downstream features by importance:" if has_non_pattern else "Top 10 patterns by importance:" ) lines += ["", section] if has_augmented: lines.append( " (includes augmented pair transforms; use " "explain_augmented_pair_effects() for raw-scale interpretation)" ) for _, row in imp.iterrows(): support_text = ( f"pattern_support={row['pattern_support']:.3f}" if row.get("support_type") == "pattern_support" else "pattern_support=n/a" ) lines.append( f" [{row.get('feature_type', 'pattern')}] " f"{row['pattern']:<40} " f"coef={row['coefficient']:>+8.4f} " f"{support_text}" ) except AttributeError: lines += ["", "Top downstream features by importance:"] lines.append(" (not available — non-LR downstream estimator)") # ── v1.1.0 adaptive binning section ────────────────────────────── if getattr(self, "_missing_col_edges_", None): lines += [ "", f"NaN handling: {len(self._missing_col_edges_)} numerical column(s) " f"pre-binned (NaN/Inf generates no transaction item at train or test time).", ] if self.adaptive_binning and getattr(self, "per_feature_b_", None): lines += ["", "Adaptive binning — chosen B per feature:"] for feat, b in sorted(self.per_feature_b_.items(), key=lambda kv: -kv[1]): edges = self._bin_edges_.get(feat, []) rng = f" [{float(edges[0]):.4g}{float(edges[-1]):.4g}]" if len(edges) >= 2 else "" lines.append(f" {feat:<35} B={b:<3}{rng}") # ───────────────────────────────────────────────────────────────── return "\n".join(lines)
# ============================================================================= # Exact cached grid tuning helper # ============================================================================= def _hugiml_auc_score_for_fast_grid(y_true: Any, proba: np.ndarray, classes: np.ndarray) -> float: """Internal validation AUC scorer used by fast_grid_tune().""" from sklearn.metrics import roc_auc_score y_arr = np.asarray(y_true) if proba.ndim != 2: raise HUGIMLValidationError("predict_proba must return a 2D array.") if proba.shape[1] == 2: return float(roc_auc_score(y_arr, proba[:, 1])) return float(roc_auc_score(y_arr, proba, multi_class="ovr", average="macro")) def _hugiml_expand_grid_for_fast_tune(param_grid: dict[str, list] | None) -> list[dict[str, Any]]: """Expand a compact sklearn-style parameter grid for fast HUGIML tuning.""" from itertools import product grid = HUGIMLClassifier.default_param_grid() if param_grid is None else param_grid if not isinstance(grid, dict) or not grid: raise HUGIMLParamError("param_grid must be a non-empty dict of parameter lists.") keys = list(grid.keys()) values = [] for key in keys: val = grid[key] if isinstance(val, (str, bytes)) or not hasattr(val, "__iter__"): val = [val] val = list(val) if not val: raise HUGIMLParamError(f"param_grid[{key!r}] must contain at least one value.") values.append(val) return [dict(zip(keys, vals)) for vals in product(*values)] def _hugiml_validate_fast_tune_grid(candidates: list[dict[str, Any]]) -> dict[str, list]: """Validate that a grid is safe for exact cached tuning. The fast path is exact when adaptive binning is enabled and only mining/ representation dimensions vary: G, L, topK, and feature_mode. Because G is part of the native mining call, candidates are cached in separate fixed-G groups. B may appear and even vary, but is ignored while adaptive_binning=True because per-feature binning supplies the effective discretisation. """ if not candidates: raise HUGIMLParamError("No grid candidates supplied.") varying = { key for key in set().union(*(set(c.keys()) for c in candidates)) if len({repr(c.get(key, None)) for c in candidates}) > 1 } allowed_varying = {"B", "G", "L", "topK", "feature_mode"} disallowed = sorted(varying - allowed_varying) if disallowed: raise HUGIMLParamError( "fast_grid_tune requires only G, L, topK, and feature_mode to vary " f"(B is ignored under adaptive_binning=True). Varying unsupported keys: {disallowed}." ) adaptive_values = {bool(c.get("adaptive_binning", True)) for c in candidates} if adaptive_values != {True}: raise HUGIMLParamError("fast_grid_tune requires adaptive_binning=True for every candidate.") g_values = sorted({float(c.get("G", 1e-2)) for c in candidates}) L_values = sorted({int(c.get("L", 1)) for c in candidates}) topk_values = sorted({int(c.get("topK", 30)) for c in candidates}) feature_modes = sorted({str(c.get("feature_mode", "patterns_only")) for c in candidates}) if any(k <= 0 for k in topk_values): raise HUGIMLParamError( "fast_grid_tune currently supports positive integer topK values only." ) if any(L < 1 for L in L_values): raise HUGIMLParamError("fast_grid_tune currently supports L >= 1 only.") allowed_modes = {"patterns_only", "original_plus_patterns", "original_plus_interactions"} bad_modes = sorted(set(feature_modes) - allowed_modes) if bad_modes: raise HUGIMLParamError(f"Unsupported feature_mode values for fast_grid_tune: {bad_modes}.") return { "L_values": L_values, "topK_values": topk_values, "feature_modes": feature_modes, "G_values": g_values, # Backward-compatible key used by older callers/tests. "G": g_values, } def _hugiml_shallow_candidate_from_base(base: HUGIMLClassifierNative) -> HUGIMLClassifierNative: """Create a candidate that shares immutable cached mining artefacts with base.""" cand = base.__class__(**base.get_params(deep=False)) share_attrs = [ "cat_cols_mask_", "is_int_mask_", "feature_names_in_", "_bin_edges_", "_missing_col_edges_", "_adaptive_code_label_map_", "_adaptive_precoded_features_", "per_feature_b_", "ig_scores_", "td_", "raw_patterns_", "classes_", "n_features_in_", "_native_available_", ] for attr in share_attrs: if hasattr(base, attr): setattr(cand, attr, getattr(base, attr)) return cand def _hugiml_prepare_candidate_from_cached_base( base: HUGIMLClassifierNative, X_train_original: Any, y_train: Any, L_value: int, topK_value: int, feature_mode: str, execution_mode: str = "audit", ) -> HUGIMLClassifierNative: """Build and fit one exact candidate from a max-topK cached base model.""" cand = _hugiml_shallow_candidate_from_base(base) cand.L = int(L_value) cand.topK = int(topK_value) cand.feature_mode = str(feature_mode) cand.execution_mode = str(execution_mode) raw_patterns = list(getattr(base, "raw_patterns_", []))[: int(topK_value)] n_train = len(y_train) base_hup = getattr(base, "x_train_hup_", None) if base_hup is None: raise RuntimeError( "fast_grid_tune requires cached training pattern matrices. " "The cache model was created in production mode or otherwise does not retain " "x_train_hup_; run tuning with execution_mode='audit'." ) if int(L_value) == 1: cand.patterns_ = raw_patterns # Fused L=1 path returns columns in raw_patterns_ order; slicing is exact. cand.x_train_hup_ = base_hup[:, : len(cand.patterns_)] else: native_td = getattr(getattr(base, "td_", None), "_td", getattr(base, "td_", None)) old_td = getattr(cand, "td_", None) cand.td_ = native_td cand.patterns_, cached_coo = cand._deduplicate_patterns_by_coverage(raw_patterns, n_train) cand.td_ = old_td if cached_coo is not None: rows, cols = cached_coo elif len(cand.patterns_) > 0: rows, cols = _core.build_train_matrix(native_td, cand.patterns_) else: rows = cols = np.zeros(0, dtype=np.int32) data = np.ones(len(rows), dtype=np.float32) cand.x_train_hup_ = csr_matrix( (data, (rows, cols)), shape=(n_train, len(cand.patterns_)), dtype=np.float32 ) if len(cand.patterns_) == 0 and cand.feature_mode == "patterns_only": raise HUGIMLMiningError( "No HUG patterns found for cached candidate. Try reducing G, increasing topK, " "or using original_plus_patterns." ) cand._setup_feature_mode_metadata() cand._setup_augmented_pair_transforms(X_train_original, y_train, fit=True) cand._current_y_for_downstream_topk_ = y_train try: X_down = cand._make_downstream_features(X_train_original, cand.x_train_hup_, fit=True) finally: if hasattr(cand, "_current_y_for_downstream_topk_"): delattr(cand, "_current_y_for_downstream_topk_") X_down = cand._apply_strict_topk_budget_fit(X_down, y_train) cand.x_train_downstream_ = X_down cand._cache_downstream_feature_metadata() cand.model_ = Pipeline([("clf", cand._make_estimator(len(cand.classes_)))]) cand.model_.fit(X_down, y_train) cand._apply_execution_mode_retention() # Intentionally avoid drift baseline and rich metadata during tuning. The # returned best_model is immediately usable for prediction; call fit() on the # selected params if full production metadata/drift baseline is required. return cand def _hugiml_fast_grid_tune( cls, X_train: Any, y_train: Any, X_val: Any, y_val: Any, param_grid: dict[str, list] | None = None, *, base_params: dict[str, Any] | None = None, scoring: str = "roc_auc", refit_full: bool = False, return_results: bool = True, ) -> dict[str, Any]: """Exact cached tuner for the compact adaptive HUGIML grid. Requirements ------------ - adaptive_binning=True for every candidate. - G may vary; the tuner partitions candidates into fixed-G cache groups. - Only G, L, topK, and feature_mode vary. B may appear in the grid but is ignored for cache partitioning because adaptive binning chooses per-feature bins and fit() passes sentinel B=2 to the native transaction builder. - max_fit_seconds must be None to guarantee equivalence to the ordinary grid loop; timeout/degradation can make cached mining fits differ from standalone candidates. Returns a dict with best_model, best_params, best_score, cv_results, and cache timings. Uses the same scorer as the ordinary grid path for all supported scoring values. During tuning it skips drift-baseline and rich final metadata; set refit_full=True to refit the selected model with normal fit(). """ t_start = time.perf_counter() candidates = _hugiml_expand_grid_for_fast_tune(param_grid) grid_info = _hugiml_validate_fast_tune_grid(candidates) params0 = dict(base_params or {}) params0.setdefault("adaptive_binning", True) params0.setdefault("use_hotpath", True) # Do not set a single global G here; G is part of mining and is fixed per # cache group below. A caller-supplied base G is used only for candidates # that omit G from the grid. params0.setdefault("G", grid_info["G_values"][0]) if params0.get("max_fit_seconds", None) is not None: raise HUGIMLParamError( "fast_grid_tune requires max_fit_seconds=None for exact equivalence." ) requested_execution_mode = str(params0.get("execution_mode", "audit")) if requested_execution_mode not in {"audit", "production"}: raise HUGIMLParamError( "execution_mode must be either 'audit' or 'production'. " f"Got {requested_execution_mode!r}." ) y_train_arr = cls._safe_cast_y(y_train) y_val_arr = np.asarray(y_val) X_train_original = cls(**params0)._copy_input_for_downstream(X_train) # Correctness note: topK is NOT derived by mining max(topK) once and slicing. # Empirically, the native miner can return additional valid patterns when a # larger topK is requested, so a smaller standalone topK run is not always # equivalent to a prefix of the larger run. To guarantee identical validation # scores to the ordinary grid loop, cache one mining fit per (G, L, topK) # group and reuse that cache only across feature_mode candidates. Within # each cache fit, fit() already sorts raw_patterns_ by descending utility with # tuple(items) tie-breaking before downstream construction. base_by_G_L_topK: dict[tuple[float, int, int], HUGIMLClassifierNative] = {} cache_fit_seconds: dict[str, float] = {} needed_cache_keys = sorted( { ( float(c.get("G", params0.get("G", 1e-2))), int(c.get("L", 1)), int(c.get("topK", 30)), ) for c in candidates } ) for G_value, L_value, topK_value in needed_cache_keys: base_fit_params = dict(params0) base_fit_params.update( { "adaptive_binning": True, "L": int(L_value), "topK": int(topK_value), # Use the richest ordinary mode so raw input is preserved and # empty-pattern fallbacks do not fail while building the cache. "feature_mode": "original_plus_patterns", "G": float(G_value), # Cached tuning needs training matrices. Even if callers pass # production in base_params, the internal cache must retain audit # artifacts; final refit below can still use caller params. "execution_mode": "audit", } ) # B may be present in the original grid, but it is intentionally ignored # under adaptive_binning=True. t_fit = time.perf_counter() base = cls(**base_fit_params) base._fast_tune_cache_only = True base.fit(X_train, y_train_arr) base.__dict__.pop("_fast_tune_cache_only", None) cache_fit_seconds[f"G={float(G_value):.12g},L={int(L_value)},topK={int(topK_value)}"] = ( time.perf_counter() - t_fit ) base_by_G_L_topK[(float(G_value), int(L_value), int(topK_value))] = base rows: list[dict[str, Any]] = [] best_score = -np.inf best_model: HUGIMLClassifierNative | None = None best_params: dict[str, Any] | None = None for candidate_params in candidates: L_value = int(candidate_params.get("L", 1)) topK_value = int(candidate_params.get("topK", 30)) feature_mode = str(candidate_params.get("feature_mode", "patterns_only")) G_value = float(candidate_params.get("G", params0.get("G", 1e-2))) t_cand = time.perf_counter() status = "ok" err = None score = np.nan model = None try: model = _hugiml_prepare_candidate_from_cached_base( base_by_G_L_topK[(G_value, L_value, topK_value)], X_train_original, y_train_arr, L_value, topK_value, feature_mode, requested_execution_mode, ) score = _hugiml_score_model_for_tune(model, X_val, y_val_arr, scoring) if np.isfinite(score) and score > best_score: best_score = float(score) best_model = model best_params = dict(candidate_params) best_params["adaptive_binning"] = True best_params["G"] = G_value best_params.setdefault("execution_mode", requested_execution_mode) except ( Exception ) as exc: # keep failed candidates visible like GridSearchCV error_score=np.nan status = "failed" err = f"{type(exc).__name__}: {exc}" rows.append( { "params": dict(candidate_params), "L": L_value, "topK": topK_value, "feature_mode": feature_mode, "G": G_value, "mean_test_score": score, "status": status, "error": err, "elapsed_seconds": time.perf_counter() - t_cand, } ) if best_model is None or best_params is None: raise HUGIMLValidationError("All fast_grid_tune candidates failed.") if refit_full: refit_params = dict(params0) refit_params.update(best_params) # Keep user-supplied B if present; adaptive_binning ignores it for transaction B. best_model = cls(**refit_params).fit(X_train, y_train_arr) result = { "best_model": best_model, "best_params": best_params, "best_score": float(best_score), "cv_results": rows if return_results else None, "cache_fit_seconds_by_G_L_topK": cache_fit_seconds, "cache_topK_strategy": "exact_per_G_L_topK_utility_ordered", "elapsed_seconds": time.perf_counter() - t_start, "method": "exact_cached_adaptive_grid", "scoring": str(scoring), } return result
[docs] @dataclasses.dataclass class HUGIMLTuneResult: """Result object returned by HUGIMLClassifier.tune(). Attributes mirror the small subset of GridSearchCV-style fields users need for quick HUGIML tuning while keeping the API lightweight. """ best_estimator_: HUGIMLClassifierNative best_params_: dict[str, Any] best_score_: float results_: Any fast_path_used_: bool elapsed_seconds_: float n_splits_: int scoring: str cv_splits_: list[tuple[np.ndarray, np.ndarray]] shuffle: bool random_state: int | None # Backward-compatible aliases for dict-style code in notebooks. @property def best_model(self) -> HUGIMLClassifierNative: return self.best_estimator_ @property def best_params(self) -> dict[str, Any]: return self.best_params_ @property def best_score(self) -> float: return self.best_score_ @property def cv_results(self) -> Any: return self.results_ @property def cv_splits(self) -> list[tuple[np.ndarray, np.ndarray]]: return self.cv_splits_
def _hugiml_params_key(params: dict[str, Any]) -> tuple[tuple[str, str], ...]: """Stable hashable key for parameter dictionaries used in tuning results.""" return tuple(sorted((str(k), repr(v)) for k, v in dict(params).items())) def _hugiml_score_model_for_tune( model: HUGIMLClassifierNative, X_val: Any, y_val: Any, scoring: str, ) -> float: """Score one fitted model for HUGIMLClassifier.tune().""" from sklearn.metrics import accuracy_score, balanced_accuracy_score, f1_score scoring_norm = str(scoring).lower() if scoring_norm in {"roc_auc", "auc"}: proba = model.predict_proba(X_val) return _hugiml_auc_score_for_fast_grid(y_val, proba, model.classes_) pred = model.predict(X_val) if scoring_norm == "accuracy": return float(accuracy_score(y_val, pred)) if scoring_norm == "balanced_accuracy": return float(balanced_accuracy_score(y_val, pred)) if scoring_norm in {"f1", "f1_binary"}: return float(f1_score(y_val, pred)) if scoring_norm == "f1_macro": return float(f1_score(y_val, pred, average="macro")) if scoring_norm == "f1_weighted": return float(f1_score(y_val, pred, average="weighted")) raise HUGIMLParamError( "Unsupported scoring value. Supported: 'roc_auc', 'accuracy', " "'balanced_accuracy', 'f1', 'f1_macro', 'f1_weighted'." ) def _hugiml_standard_grid_tune_one_split( cls, X_train: Any, y_train: Any, X_val: Any, y_val: Any, candidates: list[dict[str, Any]], base_params: dict[str, Any], scoring: str, ) -> dict[str, Any]: """Ordinary per-candidate grid evaluation for grids not eligible for fast path.""" rows: list[dict[str, Any]] = [] best_score = -np.inf best_model: HUGIMLClassifierNative | None = None best_params: dict[str, Any] | None = None y_train_arr = cls._safe_cast_y(y_train) for candidate_params in candidates: t_cand = time.perf_counter() params = dict(base_params) params.update(candidate_params) status = "ok" err = None score = np.nan model = None try: model = cls(**params).fit(X_train, y_train_arr) score = _hugiml_score_model_for_tune(model, X_val, y_val, scoring) if np.isfinite(score) and score > best_score: best_score = float(score) best_model = model best_params = dict(candidate_params) except Exception as exc: status = "failed" err = f"{type(exc).__name__}: {exc}" rows.append( { "params": dict(candidate_params), "L": candidate_params.get("L", params.get("L")), "topK": candidate_params.get("topK", params.get("topK")), "feature_mode": candidate_params.get("feature_mode", params.get("feature_mode")), "mean_test_score": score, "status": status, "error": err, "elapsed_seconds": time.perf_counter() - t_cand, } ) if best_model is None or best_params is None: raise HUGIMLValidationError("All tune candidates failed on a validation split.") return { "best_model": best_model, "best_params": best_params, "best_score": float(best_score), "cv_results": rows, "elapsed_seconds": sum(float(r["elapsed_seconds"]) for r in rows), "method": "ordinary_grid", } def _hugiml_tune( cls, X: Any, y: Any, *, cv: int | Any = 5, scoring: str = "roc_auc", param_grid: dict[str, list] | None = None, refit: bool = True, base_params: dict[str, Any] | None = None, random_state: int | None = 42, shuffle: bool = True, cv_splits: list[tuple[Any, Any]] | None = None, use_fast_path: bool = True, return_dataframe: bool = True, ) -> HUGIMLTuneResult: """Tune HUGIML on full X, y using stratified CV and optional fast-grid caching. This is the main public convenience API for quick HUGIML model selection. The regular constructor remains a single-configuration estimator; this method owns grid search, cross-validation, aggregation, and optional refit. Parameters ---------- X, y : array-like or DataFrame/Series Full training data. cv : int or splitter, default=5 Number of stratified folds, or any sklearn-compatible splitter with split(X, y). Integer cv uses StratifiedKFold. scoring : {'roc_auc', 'accuracy', 'balanced_accuracy', 'f1', 'f1_macro', 'f1_weighted'} Validation metric. 'roc_auc' supports binary and multiclass OVR macro AUC. param_grid : dict or None sklearn-style grid. None uses HUGIMLClassifier.default_param_grid(). refit : bool, default=True If True, refit the best configuration on the full X, y with normal fit(). base_params : dict or None Constructor parameters shared by every candidate. random_state : int or None, default=42 Random seed for StratifiedKFold when cv is an integer. shuffle : bool, default=True Whether StratifiedKFold shuffles before splitting. cv_splits : list of (train_idx, val_idx) or None, default=None Exact fold indices to use. When supplied, cv, shuffle, and random_state are ignored for split generation, and the same indices are returned in ``result.cv_splits_`` for reuse by other models. use_fast_path : bool, default=True Use exact cached fast-grid evaluation when the grid qualifies; otherwise fall back to ordinary per-candidate evaluation. return_dataframe : bool, default=True Return ``results_`` as a pandas DataFrame when pandas is available. Returns ------- HUGIMLTuneResult GridSearchCV-like result object with ``best_estimator_``, ``best_params_``, ``best_score_``, ``results_``, ``fast_path_used_``, ``elapsed_seconds_``, and ``n_splits_``. """ from sklearn.model_selection import StratifiedKFold t_start = time.perf_counter() base_params0 = dict(base_params or {}) candidates = _hugiml_expand_grid_for_fast_tune(param_grid) y_arr = cls._safe_cast_y(y) n_samples = len(y_arr) if cv_splits is not None: splits = [] for split_idx, (train_idx, val_idx) in enumerate(cv_splits, start=1): tr = np.asarray(train_idx, dtype=np.int64) va = np.asarray(val_idx, dtype=np.int64) if tr.ndim != 1 or va.ndim != 1: raise HUGIMLParamError( "Each cv_splits entry must contain 1D train and validation indices." ) if tr.size == 0 or va.size == 0: raise HUGIMLParamError( f"cv_splits entry {split_idx} has an empty train or validation index." ) if ( np.any(tr < 0) or np.any(va < 0) or np.any(tr >= n_samples) or np.any(va >= n_samples) ): raise HUGIMLParamError( f"cv_splits entry {split_idx} contains indices outside [0, n_samples)." ) if np.intersect1d(tr, va).size > 0: raise HUGIMLParamError( f"cv_splits entry {split_idx} has overlapping train and validation indices." ) splits.append((tr.copy(), va.copy())) elif isinstance(cv, int): if cv < 2: raise HUGIMLParamError("cv must be >= 2 when provided as an integer.") splitter = StratifiedKFold( n_splits=int(cv), shuffle=bool(shuffle), random_state=random_state ) splits = [ (np.asarray(tr, dtype=np.int64), np.asarray(va, dtype=np.int64)) for tr, va in splitter.split(X, y_arr) ] else: splits = [ (np.asarray(tr, dtype=np.int64), np.asarray(va, dtype=np.int64)) for tr, va in cv.split(X, y_arr) ] if not splits: raise HUGIMLParamError("cv produced no splits.") def _take_rows(obj: Any, idx: np.ndarray) -> Any: if hasattr(obj, "iloc"): return obj.iloc[idx] return np.asarray(obj)[idx] fast_path_allowed = False if use_fast_path: try: _hugiml_validate_fast_tune_grid(candidates) # The cached fast path is exact only for adaptive-binning grids with # no fit-time timeout/degradation. Candidate grids that explicitly # set adaptive_binning=False are rejected above; this additional # guard covers the common case where adaptive_binning or # max_fit_seconds is supplied only through base_params. if ( bool(base_params0.get("adaptive_binning", True)) and base_params0.get("max_fit_seconds", None) is None ): fast_path_allowed = True except Exception: fast_path_allowed = False fold_rows: list[dict[str, Any]] = [] fold_methods: list[str] = [] for fold_idx, (train_idx, val_idx) in enumerate(splits, start=1): X_train = _take_rows(X, train_idx) X_val = _take_rows(X, val_idx) y_train = y_arr[train_idx] y_val = y_arr[val_idx] if fast_path_allowed: try: split_result = cls.fast_grid_tune( X_train, y_train, X_val, y_val, param_grid=param_grid, base_params=base_params0, scoring=scoring, refit_full=False, return_results=True, ) except Exception: # Preserve correctness over speed: an unexpected cached-path # failure for one fold should fall back to the ordinary # per-candidate evaluation rather than aborting tuning. fast_path_allowed = False split_result = _hugiml_standard_grid_tune_one_split( cls, X_train, y_train, X_val, y_val, candidates, base_params0, scoring, ) else: split_result = _hugiml_standard_grid_tune_one_split( cls, X_train, y_train, X_val, y_val, candidates, base_params0, scoring, ) fold_methods.append(str(split_result.get("method", "unknown"))) for row in split_result.get("cv_results") or []: params = dict(row.get("params", {})) fold_rows.append( { "fold": fold_idx, "params_key": _hugiml_params_key(params), "params": params, "L": row.get("L", params.get("L")), "topK": row.get("topK", params.get("topK")), "feature_mode": row.get("feature_mode", params.get("feature_mode")), "split_test_score": row.get("mean_test_score", np.nan), "status": row.get("status", "ok"), "error": row.get("error"), "elapsed_seconds": row.get("elapsed_seconds", np.nan), } ) if not fold_rows: raise HUGIMLValidationError("No tuning results were produced.") grouped: dict[tuple[tuple[str, str], ...], list[dict[str, Any]]] = {} for row in fold_rows: grouped.setdefault(row["params_key"], []).append(row) summary_rows: list[dict[str, Any]] = [] for key, rows_for_key in grouped.items(): scores = np.asarray([float(r["split_test_score"]) for r in rows_for_key], dtype=float) finite = scores[np.isfinite(scores)] first_params = dict(rows_for_key[0]["params"]) summary_rows.append( { "params": first_params, "L": first_params.get("L"), "topK": first_params.get("topK"), "feature_mode": first_params.get("feature_mode"), "mean_test_score": float(np.mean(finite)) if finite.size else np.nan, "std_test_score": float(np.std(finite, ddof=0)) if finite.size else np.nan, "n_successful_splits": int(finite.size), "n_splits": int(len(splits)), "mean_elapsed_seconds": float( np.nanmean([r["elapsed_seconds"] for r in rows_for_key]) ), "status": "ok" if finite.size == len(splits) else "partial_or_failed", } ) summary_rows.sort( key=lambda r: ( -float(r["mean_test_score"]) if np.isfinite(r["mean_test_score"]) else np.inf, repr(r["params"]), ) ) if not summary_rows or not np.isfinite(summary_rows[0]["mean_test_score"]): raise HUGIMLValidationError("All tune candidates failed across CV splits.") for rank, row in enumerate(summary_rows, start=1): row["rank_test_score"] = rank best_params = dict(base_params0) best_params.update(dict(summary_rows[0]["params"])) best_score = float(summary_rows[0]["mean_test_score"]) if refit: best_estimator = cls(**best_params).fit(X, y_arr) else: # Return a fitted estimator from the first fold for convenience. It is # valid for immediate inspection/prediction on that fold's fitted state, # but refit=True is recommended for production use. train_idx, val_idx = splits[0] best_estimator = cls(**best_params).fit(_take_rows(X, train_idx), y_arr[train_idx]) if return_dataframe: try: results_obj = pd.DataFrame(summary_rows) except Exception: results_obj = summary_rows else: results_obj = summary_rows return HUGIMLTuneResult( best_estimator_=best_estimator, best_params_=best_params, best_score_=best_score, results_=results_obj, fast_path_used_=bool( fast_path_allowed and all(m == "exact_cached_adaptive_grid" for m in fold_methods) ), elapsed_seconds_=time.perf_counter() - t_start, n_splits_=int(len(splits)), scoring=str(scoring), cv_splits_=[(tr.copy(), va.copy()) for tr, va in splits], shuffle=bool(shuffle), random_state=random_state, ) HUGIMLClassifierNative.fast_grid_tune = classmethod(_hugiml_fast_grid_tune) HUGIMLClassifierNative.tune = classmethod(_hugiml_tune) # Backward-compatible public class name. HUGIMLClassifierNative remains # available for existing code; HUGIMLClassifier is the cleaner end-user entry # point and intentionally shares the exact implementation and serialization # contract. HUGIMLClassifier = HUGIMLClassifierNative __all__ = [ "HUGIMLClassifier", "HUGIMLClassifierNative", "FitMetadata", "HUGIMLTuneResult", ]