Source code for rocelib.recourse_methods.ModelMultiplicityMILP

import pandas as pd
from gurobipy import Model
from gurobipy.gurobipy import quicksum, GRB
from sklearn.linear_model import LogisticRegression

from rocelib.datasets.DatasetLoader import DatasetLoader
from rocelib.models.imported_models.KerasModel import KerasModel
from rocelib.models.imported_models.PytorchModel import PytorchModel
from rocelib.models.imported_models.SKLearnModel import SKLearnModel
from rocelib.recourse_methods.RecourseGenerator import RecourseGenerator
from rocelib.tasks.Task import Task


[docs] def create_weights_and_bias_dictionary(model): """ Extract the learned weights and biases from a PyTorch-based model and store them in dictionaries. The first layer in the model is considered the input layer (layer 0). Parameters ---------- model : PytorchModel A PytorchModel object containing the trained PyTorch model. Returns ------- tuple of (dict, dict) Two dictionaries: - weight_dict, keyed by 'weight_l<layer_idx>_n<src_idx>_to_l<layer_idx+1>_n<dest_idx>' - bias_dict, keyed by 'bias_into_l<layer_idx+1>_n<dest_idx>' """ params = {} if isinstance(model, KerasModel): layer_idx = 0 # Counter for naming layers like '0.weight', '0.bias', etc. for layer in model.model.layers: weights = layer.trainable_weights if weights: for weight in weights: param_type = "weight" if "kernel" in weight.name else "bias" if param_type == "weight": params[f"{layer_idx * 2}.{param_type}"] = weight.numpy().T # Transpose weights to match PyTorch else: params[f"{layer_idx * 2}.{param_type}"] = weight.numpy() layer_idx += 1 elif isinstance(model, SKLearnModel): layer_idx = 0 # Counter for layer naming sklearn_model = model.model # Extract actual sklearn model if hasattr(sklearn_model, "coef_"): # Logistic Regression, Linear Regression, SVM (linear kernel) params[f"{layer_idx * 2}.weight"] = sklearn_model.coef_ if hasattr(sklearn_model, "intercept_"): # Some models may not have intercept params[f"{layer_idx * 2}.bias"] = sklearn_model.intercept_ elif hasattr(sklearn_model, "coefs_"): # MLPClassifier and MLPRegressor for idx, (weights, bias) in enumerate(zip(sklearn_model.coefs_, sklearn_model.intercepts_)): params[f"{idx * 2}.weight"] = weights.T # Transpose to match PyTorch format params[f"{idx * 2}.bias"] = bias elif hasattr(sklearn_model, "support_vectors_"): # SVM models params["support_vectors"] = sklearn_model.support_vectors_ if hasattr(sklearn_model, "dual_coef_"): # Dual coefficients (for SVC) params["dual_coef"] = sklearn_model.dual_coef_ if hasattr(sklearn_model, "intercept_"): # Intercept term params["intercept"] = sklearn_model.intercept_ elif hasattr(sklearn_model, "tree_"): # Decision Trees params["tree_structure"] = sklearn_model.tree_ params["feature_importances"] = sklearn_model.feature_importances_ elif hasattr(sklearn_model, "estimators_"): # Random Forest, Gradient Boosting for i, estimator in enumerate(sklearn_model.estimators_): params[f"estimator_{i}"] = estimator.tree_ else: for name, param in model.model.named_parameters(): params[name] = param.detach().numpy() weight_dict = {} bias_dict = {} # Loop through layers for layer_idx in range(0, len(params) // 2): # Get weights and biases weights = params[f'{layer_idx * 2}.weight'] #(8, 34) biases = params[f'{layer_idx * 2}.bias'] for dest_idx in range(weights.shape[0]): # Set the interval for biases bias_key = f'bias_into_l{layer_idx + 1}_n{dest_idx}' bias_dict[bias_key] = biases[dest_idx] for src_idx in range(weights.shape[1]): # Set the interval for weights weight_key = f'weight_l{layer_idx}_n{src_idx}_to_l{layer_idx + 1}_n{dest_idx}' weight = weights[dest_idx, src_idx] weight_dict[weight_key] = weight return weight_dict, bias_dict
[docs] class ModelMultiplicityMILP(RecourseGenerator): """ Formulates and solves a Mixed Integer Linear Program (MILP) to find a recourse instance that satisfies the classification constraints of multiple neural network models simultaneously. """ def __init__(self, ct: Task, custom_distance_func=None): """ Initializes the Argumentative Ensembling CE generator with a dataset and a list of models. Args: dl: dataset loader models: the list of models forming the model multiplicity problem setting """ super().__init__(ct) self.gurobiModel = Model() self.models = ct.mm_models.values() self.inputNodes = None self.outputNodes = {} def _generation_method(self, instance, column_name="target", neg_value=0, M=1000, epsilon=0.1, **kwargs): """ Generate a recourse instance by formulating and solving the MILP with constraints derived from each of the provided PyTorch models. Parameters ---------- instance : pd.DataFrame or list The original instance (features) for which recourse is to be generated. column_name : str, optional Unused in this implementation, by default "target". neg_value : int, optional Decides the sign of the output constraints; 0 for non-negative and 1 for non-positive, by default 0. M : float, optional A large constant for big-M constraints, by default 1000. epsilon : float, optional A small offset for output constraints, by default 0.1. kwargs : dict Additional arguments (not used in this method). Returns ------- pd.DataFrame A single-row DataFrame containing the values of the recourse instance if a solution is found. If the MILP is infeasible (no solution), returns an empty DataFrame. """ self.gurobiModel = Model() # Turn off the Gurobi output self.gurobiModel.setParam('OutputFlag', 0) if isinstance(instance, pd.DataFrame): ilist = instance.iloc[0].tolist() else: ilist = instance.tolist() # Dictionary to store input variables, shared across all models self.inputNodes = {} activation_states = {} all_nodes = {} # Create Gurobi variables for the inputs (shared for all models) for i, col in enumerate(self.task.dataset.X.columns): key = f"v_0_{i}" # Calculate the minimum and maximum values for the current column col_min = self.task.dataset.X[col].min() col_max = self.task.dataset.X[col].max() # Use the calculated min and max for the bounds of the variable self.inputNodes[key] = self.gurobiModel.addVar(lb=col_min, ub=col_max, name=key) all_nodes[key] = self.inputNodes[key] for model_idx, model in enumerate(self.models): if not (isinstance(model.model, LogisticRegression) or isinstance(model.model, KerasModel) or isinstance(model.model, PytorchModel)): continue weights, biases = create_weights_and_bias_dictionary(model) layers = [model.input_dim] + model.hidden_dim + [model.output_dim] # Iterate through all "hidden" layers, the first value in intabs.layers is the input layer and the # last value in intabs.layers is the output layer. The actual layer index whose variables we want to # create is layer at index layer+1 for layer in range(len(layers) - 2): # Go through each layer in the layer whose variables we want to create for node in range(layers[layer + 1]): # Create Gurobi variables for each node and their activation state var_name = f"model{model_idx}_v_{layer + 1}_{node}" activation_name = f"model{model_idx}_xi_{layer + 1}_{node}" all_nodes[var_name] = self.gurobiModel.addVar(lb=-float('inf'), name=var_name) activation_states[activation_name] = self.gurobiModel.addVar(vtype=GRB.BINARY, name=activation_name) self.gurobiModel.update() # 1) Add v_i_j >= 0 constraint self.gurobiModel.addConstr(all_nodes[var_name] >= 0, name=f"model{model_idx}_constr1_" + var_name) # 2) Add v_i_j <= M ( 1 - xi_i_j ) self.gurobiModel.addConstr(M * (1 - activation_states[activation_name]) >= all_nodes[var_name], name=f"model{model_idx}_constr2_" + var_name) qr = quicksum(( weights[f'weight_l{layer}_n{prev_node_index}_to_l{layer + 1}_n{node}'] * all_nodes[ f"model{model_idx}_v_{layer}_{prev_node_index}" if layer else f"v_0_{prev_node_index}"] for prev_node_index in range(layers[layer]) )) + biases[f'bias_into_l{layer + 1}_n{node}'] # 3) Add v_i_j <= sum((W_i_j + delta)v_i-1_j + ... + M xi_i_j) self.gurobiModel.addConstr(qr + M * activation_states[ activation_name] >= all_nodes[var_name], name=f"model{model_idx}_constr3_" + var_name) self.gurobiModel.addConstr(qr <= all_nodes[var_name]) self.gurobiModel.update() outputNode = self.gurobiModel.addVar(lb=-float('inf'), vtype=GRB.CONTINUOUS, name=f'model{model_idx}_output_node') self.outputNodes[f'model{model_idx}_output_node'] = outputNode # constraint 1: node <= ub(W)x + ub(B) self.gurobiModel.addConstr(quicksum(( weights[f'weight_l{len(layers) - 2}_n{prev_node_index}_to_l{len(layers) - 1}_n0'] * all_nodes[ f"model{model_idx}_v_{len(layers) - 2}_{prev_node_index}" if len( layers) - 2 else f"v_0_{prev_node_index}"] for prev_node_index in range(layers[len(layers) - 2]) )) + biases[f'bias_into_l{len(layers) - 1}_n0'] == outputNode, name=f'model{model_idx}_output_node_C1') if not neg_value: self.gurobiModel.addConstr(outputNode - epsilon >= 0, name=f"model{model_idx}_output_node_lb_>=0") else: self.gurobiModel.addConstr(outputNode + epsilon <= 0, name=f"model{model_idx}_output_node_ub_<=0") self.gurobiModel.update() objective = self.gurobiModel.addVar(name="objective") self.gurobiModel.addConstr(objective == quicksum( (self.inputNodes[f'v_0_{i}'] - ilist[i]) ** 2 for i in range(len(self.task.dataset.X.columns)))) self.gurobiModel.update() self.gurobiModel.optimize() status = self.gurobiModel.status # If no solution was obtained that means the INN could not be modelled if status != GRB.status.OPTIMAL: return pd.DataFrame() ce = [] for v in self.gurobiModel.getVars(): if 'v_0_' in v.varName: ce.append(v.getAttr(GRB.Attr.X)) # return pd.DataFrame(ce).T return pd.DataFrame([ce], columns=self.task.dataset.X.columns)