Source code for hugiml.explainability

# Copyright 2026 Srikumar Krishnamoorthy
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Enterprise explainability for HUGIMLClassifierNative.

Provides SHAP interoperability, feature lineage tracking, explanation
stability metrics, and audit artifact generation.  The core HUG patterns
are human-readable by design; this module adds depth for downstream
governance and audit workflows.
"""

from __future__ import annotations

import json
import logging
import warnings
from dataclasses import asdict, dataclass, field
from typing import Any

import numpy as np

logger = logging.getLogger(__name__)

__all__ = [
    "ExplainabilityReport",
    "FeatureLineage",
    "ExplanationStabilityMetrics",
    "HUGPatternExplainer",
    "shap_values_from_pattern_matrix",
]


# =============================================================================
# Data containers
# =============================================================================


[docs] @dataclass class FeatureLineage: """Provenance record linking an original feature to downstream features. Attributes ---------- feature_name : str Original feature name from the training DataFrame. feature_type : str One of 'integer', 'float', 'categorical'. derived_patterns : list of str Human-readable HUG pattern labels that include this feature. pattern_indices : list of int Indices into the pattern list for each derived pattern. derived_augmented_pairs : list of str Augmented-pair feature names that use this source feature. total_importance : float Sum of absolute downstream coefficients for original, HUG pattern, and augmented-pair features linked to this source feature. pattern_importance : float Pattern-only contribution to total_importance. augmented_pair_importance : float Augmented-pair contribution to total_importance. original_feature_importance : float Direct original-feature contribution when original features are included in the downstream estimator. """ feature_name: str feature_type: str derived_patterns: list[str] = field(default_factory=list) pattern_indices: list[int] = field(default_factory=list) derived_augmented_pairs: list[str] = field(default_factory=list) total_importance: float = 0.0 pattern_importance: float = 0.0 augmented_pair_importance: float = 0.0 original_feature_importance: float = 0.0
[docs] @dataclass class ExplanationStabilityMetrics: """Stability metrics for pattern-based explanations. The top-level fields report stability for mined HUG patterns only. When original or augmented-pair downstream features are present, per-feature-type metrics are available in ``by_feature_type`` so derived feature stability is not conflated with human-readable pattern-rule stability. """ jaccard_similarity: float = 0.0 rank_correlation: float = 0.0 pattern_overlap_count: int = 0 n_patterns_a: int = 0 n_patterns_b: int = 0 by_feature_type: dict[str, dict[str, float | int]] = field(default_factory=dict)
[docs] @dataclass class ExplainabilityReport: """Full explainability report for a fitted classifier instance. Contains pattern importances, feature lineage, and stability metrics. Serializable to JSON for audit workflows. """ model_id: str n_patterns: int n_features: int top_patterns: list[dict[str, Any]] = field(default_factory=list) feature_lineage: list[dict[str, Any]] = field(default_factory=list) model_composition: dict[str, Any] = field(default_factory=dict) augmented_pair_effects: list[dict[str, Any]] = field(default_factory=list) stability: dict[str, Any] | None = None shap_available: bool = False
[docs] def to_json(self, indent: int = 2) -> str: """Serialize the report to a JSON string.""" return json.dumps(asdict(self), indent=indent, default=str)
[docs] def save(self, path: str) -> None: """Write the report to a JSON file.""" with open(path, "w", encoding="utf-8") as fh: fh.write(self.to_json())
# ============================================================================= # Main explainer # =============================================================================
[docs] class HUGPatternExplainer: """Enterprise explainability layer over a fitted HUGIMLClassifierNative. Extracts feature lineage, computes explanation stability, and provides a SHAP-compatible interface where available. Designed to operate on the already-mined HUG patterns without re-running the algorithm. Parameters ---------- classifier : HUGIMLClassifierNative A fitted classifier instance. """ def __init__(self, classifier: Any) -> None: self._clf = classifier self._validate_fitted() def _validate_fitted(self) -> None: if not hasattr(self._clf, "patterns_"): raise RuntimeError("Classifier must be fitted before creating an explainer.") # ------------------------------------------------------------------ # Feature lineage # ------------------------------------------------------------------
[docs] def feature_lineage(self) -> list[FeatureLineage]: """Build feature lineage mapping each input feature to its patterns. Returns ------- list of FeatureLineage One entry per original input feature. """ clf = self._clf feature_names = getattr(clf, "feature_names_in_", None) or [] cat_mask = getattr(clf, "cat_cols_mask_", np.zeros(len(feature_names), dtype=bool)) int_mask = getattr(clf, "is_int_mask_", np.zeros(len(feature_names), dtype=bool)) pattern_labels = clf.get_hug_features() # Get downstream importances when available. Pattern entries are # linked by HUG pattern label; augmented-pair entries are linked through # their source input metadata; direct original features are linked by # their ``orig:<name>`` namespace. pattern_importance: dict[str, float] = {} original_importance: dict[str, float] = {} augmented_importance: dict[str, float] = {} augmented_display: dict[str, str] = {} try: imp = clf.feature_importances() for _, row in imp.iterrows(): ftype = str(row.get("feature_type", "pattern")) feature_key = str(row.get("feature", row.get("pattern", ""))) display = str(row.get("display_name", row.get("pattern", feature_key))) importance = float(row.get("abs_coefficient", 0.0)) if ftype == "pattern": pattern_importance[str(row.get("pattern", display))] = importance elif ftype == "original" and feature_key.startswith("orig:"): original_importance[feature_key[len("orig:") :]] = importance elif ftype == "augmented_pair": augmented_importance[feature_key] = importance augmented_display[feature_key] = display except Exception: logger.debug("feature_importances() unavailable for lineage scoring.", exc_info=True) lineage: dict[str, FeatureLineage] = {} for fname in feature_names: j = feature_names.index(fname) ftype = ( "categorical" if j < len(cat_mask) and cat_mask[j] else "integer" if j < len(int_mask) and int_mask[j] else "float" ) lineage[fname] = FeatureLineage( feature_name=fname, feature_type=ftype, ) for fname, importance in original_importance.items(): if fname in lineage: lineage[fname].original_feature_importance += importance lineage[fname].total_importance += importance for pat_idx, label in enumerate(pattern_labels): importance = pattern_importance.get(label, 0.0) parts = label.split(", ") for part in parts: if "=" in part: fname = part.split("=")[0] if fname in lineage: lineage[fname].derived_patterns.append(label) lineage[fname].pattern_indices.append(pat_idx) lineage[fname].pattern_importance += importance lineage[fname].total_importance += importance try: pair_catalog = clf.get_augmented_pair_transforms() except Exception: pair_catalog = [] for item in pair_catalog: name = str(item.get("name", "")) feature_key = f"augmented_pair:{name}" importance = augmented_importance.get(feature_key, 0.0) if importance == 0.0: continue display = augmented_display.get(feature_key, str(item.get("raw_formula", name))) for src in item.get("inputs", []) or []: fname = str(src) if fname in lineage: lineage[fname].derived_augmented_pairs.append(display) lineage[fname].augmented_pair_importance += importance lineage[fname].total_importance += importance return list(lineage.values())
# ------------------------------------------------------------------ # Explanation stability # ------------------------------------------------------------------
[docs] def explanation_stability( self, X_a: np.ndarray, y_a: np.ndarray, X_b: np.ndarray, y_b: np.ndarray, top_n: int = 20, ) -> ExplanationStabilityMetrics: """Measure explanation stability across two data splits. Fits two copies of the classifier on split A and split B. The headline metrics compare only mined HUG patterns. Additional metrics are returned by feature type so original features, HUG patterns, and augmented-pair transforms are not mixed into a single stability score. Parameters ---------- X_a, y_a : split A data X_b, y_b : split B data top_n : int How many top patterns to compare. Returns ------- ExplanationStabilityMetrics """ import copy clf_a = copy.deepcopy(self._clf) clf_b = copy.deepcopy(self._clf) # Reset fitted state and re-fit on each split for attr in ["patterns_", "td_", "model_", "x_train_hup_", "fit_metadata_"]: for c in (clf_a, clf_b): if hasattr(c, attr): delattr(c, attr) clf_a.fit(X_a, y_a) clf_b.fit(X_b, y_b) try: imp_a = clf_a.feature_importances().head(top_n) imp_b = clf_b.feature_importances().head(top_n) except Exception: logger.debug( "explanation_stability(): could not compute feature_importances on split.", exc_info=True, ) return ExplanationStabilityMetrics() def _stability_for(frame_a: Any, frame_b: Any, feature_type: str) -> dict[str, float | int]: fa = frame_a[frame_a.get("feature_type", "pattern") == feature_type].head(top_n) fb = frame_b[frame_b.get("feature_type", "pattern") == feature_type].head(top_n) key_col = ( "feature" if "feature" in fa.columns and "feature" in fb.columns else "pattern" ) set_a = set(fa[key_col].tolist()) set_b = set(fb[key_col].tolist()) overlap = set_a & set_b union = set_a | set_b jaccard = len(overlap) / len(union) if union else 0.0 rank_corr = 0.0 if len(overlap) >= 3: shared = list(overlap) ranks_a = [fa.index[fa[key_col] == item].tolist()[0] for item in shared] ranks_b = [fb.index[fb[key_col] == item].tolist()[0] for item in shared] if len(ranks_a) == len(ranks_b) and len(ranks_a) >= 3: try: from scipy.stats import spearmanr corr, _ = spearmanr(ranks_a, ranks_b) rank_corr = float(corr) if np.isfinite(corr) else 0.0 except Exception: logger.debug("spearmanr rank correlation failed.", exc_info=True) return { "jaccard_similarity": round(jaccard, 4), "rank_correlation": round(rank_corr, 4), "overlap_count": len(overlap), "n_features_a": len(set_a), "n_features_b": len(set_b), } feature_types = sorted( set(imp_a.get("feature_type", "pattern").tolist()) | set(imp_b.get("feature_type", "pattern").tolist()) ) by_type = {ft: _stability_for(imp_a, imp_b, ft) for ft in feature_types} pattern_metrics = by_type.get( "pattern", { "jaccard_similarity": 0.0, "rank_correlation": 0.0, "overlap_count": 0, "n_features_a": 0, "n_features_b": 0, }, ) return ExplanationStabilityMetrics( jaccard_similarity=float(pattern_metrics["jaccard_similarity"]), rank_correlation=float(pattern_metrics["rank_correlation"]), pattern_overlap_count=int(pattern_metrics["overlap_count"]), n_patterns_a=int(pattern_metrics["n_features_a"]), n_patterns_b=int(pattern_metrics["n_features_b"]), by_feature_type=by_type, )
# ------------------------------------------------------------------ # Full report # ------------------------------------------------------------------
[docs] def generate_report( self, model_id: str = "hugiml_model", top_n: int = 20, ) -> ExplainabilityReport: """Generate a complete explainability report. Parameters ---------- model_id : str Identifier for this model instance. top_n : int Number of top patterns to include. Returns ------- ExplainabilityReport """ clf = self._clf def _json_value(value: Any) -> Any: if isinstance(value, np.generic): value = value.item() if isinstance(value, float) and not np.isfinite(value): return None if isinstance(value, (list, tuple)): return [_json_value(v) for v in value] if isinstance(value, dict): return {str(k): _json_value(v) for k, v in value.items()} return value top_patterns: list[dict[str, Any]] = [] try: imp = clf.feature_importances().head(top_n) governance_columns = [ "pattern", "feature", "display_name", "feature_type", "coefficient", "abs_coefficient", "pattern_support", "support_type", "non_missing_rate", "variance", "strict_topk_score", "raw_formula", "standardized_formula", "standardization_mean", "standardization_scale", "reference_raw_value", "reference_raw_value_description", "pair_missing_policy", "pair_missing_policy_description", "eligible_count", "eligible_rate", "missing_pair_rate", "coefficient_standardized", "one_std_effect_on_log_odds", "coefficient_raw_scale", "one_raw_unit_effect_on_log_odds", "decision_direction", "risk_increases_when", "unit_effect_interpretation", "raw_scale_note", "raw_interpretation", "source_observed_medians", "source_observed_medians_description", "transform_ig", ] for _, row in imp.iterrows(): record = { col: _json_value(row.get(col)) for col in governance_columns if col in row } record.setdefault("pattern", _json_value(row.get("pattern"))) record.setdefault("feature", _json_value(row.get("feature", row.get("pattern")))) record.setdefault("feature_type", _json_value(row.get("feature_type", "pattern"))) top_patterns.append(record) except Exception: logger.debug( "feature_importances() unavailable in generate_report; " "falling back to get_pattern_info().", exc_info=True, ) info = clf.get_pattern_info().head(top_n) for _, row in info.iterrows(): top_patterns.append( { "pattern": row["pattern"], "utility": float(row["utility"]), "information_gain": float(row["information_gain"]), "pattern_support": float(row["support"]), "support_type": "pattern_support", } ) lineage = self.feature_lineage() lineage_dicts = [ { "feature_name": fl.feature_name, "feature_type": fl.feature_type, "n_patterns": len(fl.derived_patterns), "n_augmented_pairs": len(fl.derived_augmented_pairs), "total_importance": round(fl.total_importance, 6), "pattern_importance": round(fl.pattern_importance, 6), "augmented_pair_importance": round(fl.augmented_pair_importance, 6), "original_feature_importance": round(fl.original_feature_importance, 6), "derived_patterns": fl.derived_patterns[:5], "derived_augmented_pairs": fl.derived_augmented_pairs[:5], } for fl in lineage ] try: model_composition = clf.get_model_composition() except Exception: model_composition = { "feature_mode": getattr(clf, "feature_mode", None), "topk_budget_strict": getattr(clf, "topk_budget_strict", None), "n_patterns_mined": len(getattr(clf, "patterns_", [])), } augmented_pair_effects: list[dict[str, Any]] = [] try: effects = clf.explain_augmented_pair_effects() for _, row in effects.iterrows(): augmented_pair_effects.append({str(k): _json_value(v) for k, v in row.items()}) except Exception: logger.debug( "explain_augmented_pair_effects() unavailable in generate_report.", exc_info=True ) shap_available = _shap_is_available() return ExplainabilityReport( model_id=model_id, n_patterns=len(clf.patterns_), n_features=getattr(clf, "n_features_in_", 0), top_patterns=top_patterns, feature_lineage=lineage_dicts, model_composition={str(k): _json_value(v) for k, v in model_composition.items()}, augmented_pair_effects=augmented_pair_effects, shap_available=shap_available, )
# ============================================================================= # SHAP interoperability # =============================================================================
[docs] def shap_values_from_pattern_matrix( classifier: Any, X: Any, *, background_samples: int = 100, check_additivity: bool = False, allow_incomplete: bool = False, ) -> np.ndarray | None: """Compute SHAP values over the HUG pattern feature space. Applies SHAP's LinearExplainer (or KernelExplainer as fallback) on the binary pattern-presence matrix produced by the classifier's transform() method. The resulting SHAP values are in pattern-space; use :func:`aggregate_shap_to_features` to roll them back to original features. When the fitted downstream estimator also uses original or augmented-pair features, pattern-space SHAP is incomplete relative to the fitted model. In that case this function warns and returns ``None`` unless ``allow_incomplete=True`` is passed explicitly. Requires the optional ``shap`` package (``pip install shap``). Parameters ---------- classifier : HUGIMLClassifierNative A fitted classifier. X : array-like Input data to explain. background_samples : int Number of background samples for KernelExplainer. check_additivity : bool Pass to SHAP's explain call. allow_incomplete : bool If False, return None when the fitted downstream estimator uses original or augmented-pair features in addition to HUG patterns. Returns ------- np.ndarray of shape (n_samples, n_patterns) or None SHAP values in pattern space. Returns None when shap is not installed. """ downstream_names = list(getattr(classifier, "get_downstream_features", lambda: [])()) non_pattern = [name for name in downstream_names if not str(name).startswith("pattern:")] if non_pattern and not allow_incomplete: warnings.warn( "Pattern-space SHAP is incomplete because the fitted downstream estimator " "also uses original or augmented-pair features. Pass allow_incomplete=True " "only if a pattern-only diagnostic is intended.", RuntimeWarning, stacklevel=2, ) return None if not _shap_is_available(): warnings.warn( "SHAP is not installed. Install it with: pip install shap", ImportWarning, stacklevel=2, ) return None import shap X_hup = classifier.transform(X) # Try LinearExplainer first (works when downstream is LogisticRegression) clf_step = classifier.model_.named_steps.get("clf") try: explainer = shap.LinearExplainer(clf_step, X_hup) sv = explainer.shap_values(X_hup) if isinstance(sv, list): sv = sv[1] if len(sv) == 2 else np.array(sv).mean(axis=0) return np.array(sv) except Exception: logger.debug("SHAP LinearExplainer failed; trying KernelExplainer.", exc_info=True) try: bg_size = min(background_samples, X_hup.shape[0]) bg_indices = np.random.choice(X_hup.shape[0], bg_size, replace=False) bg = X_hup[bg_indices] explainer = shap.KernelExplainer(classifier.model_.predict_proba, bg) sv = explainer.shap_values(X_hup, check_additivity=check_additivity) if isinstance(sv, list): sv = sv[1] if len(sv) == 2 else np.array(sv).mean(axis=0) return np.array(sv) except Exception as e: warnings.warn(f"SHAP computation failed: {e}", RuntimeWarning, stacklevel=2) return None
def aggregate_shap_to_features( shap_values_pattern: np.ndarray, classifier: Any, ) -> dict[str, float]: """Aggregate pattern-space SHAP values back to original features. Parameters ---------- shap_values_pattern : np.ndarray, shape (n_samples, n_patterns) classifier : fitted HUGIMLClassifierNative Returns ------- dict mapping feature name to mean absolute SHAP value. """ downstream_names = list(getattr(classifier, "get_downstream_features", lambda: [])()) non_pattern = [name for name in downstream_names if not str(name).startswith("pattern:")] if non_pattern: warnings.warn( "Aggregating pattern-space SHAP to original features omits original " "downstream columns and augmented-pair transforms. Use this only as a " "pattern-subspace diagnostic for this fitted model.", RuntimeWarning, stacklevel=2, ) feature_names = getattr(classifier, "feature_names_in_", None) or [] pattern_labels = classifier.get_hug_features() aggregated: dict[str, float] = {f: 0.0 for f in feature_names} mean_abs = np.abs(shap_values_pattern).mean(axis=0) for pat_idx, label in enumerate(pattern_labels): if pat_idx >= len(mean_abs): break parts = label.split(", ") for part in parts: if "=" in part: fname = part.split("=")[0] if fname in aggregated: aggregated[fname] += float(mean_abs[pat_idx]) return aggregated def _shap_is_available() -> bool: try: import shap # noqa: F401 return True except ImportError: return False