Module fri.model.lupi_ordinal_regression
View Source
from itertools import product
import cvxpy as cvx
import numpy as np
from sklearn.metrics import make_scorer
from sklearn.utils import check_X_y
from fri.model.base_lupi import (
LUPI_Relevance_CVXProblem,
split_dataset,
is_lupi_feature,
)
from fri.model.ordinal_regression import (
OrdinalRegression_Relevance_Bound,
ordinal_scores,
)
from .base_initmodel import LUPI_InitModel
from .base_type import ProblemType
class LUPI_OrdinalRegression(ProblemType):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._lupi_features = None
@property
def lupi_features(self):
return self._lupi_features
@classmethod
def parameters(cls):
return ["C", "scaling_lupi_w"]
@property
def get_initmodel_template(cls):
return LUPI_OrdinalRegression_SVM
@property
def get_cvxproblem_template(cls):
return LUPI_OrdinalRegression_Relevance_Bound
def relax_factors(cls):
return ["loss_slack", "w_l1_slack"]
def preprocessing(self, data, lupi_features=None):
X, y = data
d = X.shape[1]
if lupi_features is None:
raise ValueError("Argument 'lupi_features' missing in fit() call.")
if not isinstance(lupi_features, int):
raise ValueError("Argument 'lupi_features' is not type int.")
if not 0 < lupi_features < d:
raise ValueError(
"Argument 'lupi_features' looks wrong. We need at least 1 priviliged feature (>0) or at least one normal feature."
)
self._lupi_features = lupi_features
# Check that X and y have correct shape
X, y = check_X_y(X, y)
if np.min(y) > 0:
print("First ordinal class has index > 0. Shifting index...")
y = y - np.min(y)
return X, y
class LUPI_OrdinalRegression_SVM(LUPI_InitModel):
HYPERPARAMETER = ["C", "scaling_lupi_w"]
def __init__(self, C=1, scaling_lupi_w=1, lupi_features=None):
super().__init__()
self.scaling_lupi_w = scaling_lupi_w
self.C = C
self.lupi_features = lupi_features
def fit(self, X_combined, y, lupi_features=None):
"""
Parameters
----------
lupi_features : int
Number of features in dataset which are considered privileged information (PI).
PI features are expected to be the last features in the dataset.
"""
if lupi_features is None:
try:
lupi_features = self.lupi_features
self.lupi_features = lupi_features
except:
raise ValueError("No amount of lupi features given.")
X, X_priv = split_dataset(X_combined, self.lupi_features)
(n, d) = X.shape
self.classes_ = np.unique(y)
# Get parameters from CV model without any feature contstraints
C = self.get_params()["C"]
scaling_lupi_w = self.get_params()["scaling_lupi_w"]
get_original_bin_name, n_bins = get_bin_mapping(y)
n_boundaries = n_bins - 1
# Initalize Variables in cvxpy
w = cvx.Variable(shape=(d), name="w")
b_s = cvx.Variable(shape=(n_boundaries), name="bias")
w_priv = cvx.Variable(shape=(self.lupi_features, 2), name="w_priv")
d_priv = cvx.Variable(shape=(2), name="bias_priv")
def priv_function(bin, sign):
indices = np.where(y == get_original_bin_name[bin])
return X_priv[indices] @ w_priv[:, sign] + d_priv[sign]
# L1 norm regularization of both functions with 1 scaling constant
priv_l1_1 = cvx.norm(w_priv[:, 0], 1)
priv_l1_2 = cvx.norm(w_priv[:, 1], 1)
w_priv_l1 = priv_l1_1 + priv_l1_2
w_l1 = cvx.norm(w, 1)
weight_regularization = 0.5 * (w_l1 + scaling_lupi_w * w_priv_l1)
constraints = []
loss = 0
for left_bin in range(0, n_bins - 1):
indices = np.where(y == get_original_bin_name[left_bin])
constraints.append(
X[indices] @ w - b_s[left_bin] <= -1 + priv_function(left_bin, 0)
)
constraints.append(priv_function(left_bin, 0) >= 0)
loss += cvx.sum(priv_function(left_bin, 0))
# Add constraints for slack into right neighboring bins
for right_bin in range(1, n_bins):
indices = np.where(y == get_original_bin_name[right_bin])
constraints.append(
X[indices] @ w - b_s[right_bin - 1] >= +1 - priv_function(right_bin, 1)
)
constraints.append(priv_function(right_bin, 1) >= 0)
loss += cvx.sum(priv_function(right_bin, 1))
for i_boundary in range(0, n_boundaries - 1):
constraints.append(b_s[i_boundary] <= b_s[i_boundary + 1])
objective = cvx.Minimize(C * loss + weight_regularization)
# Solve problem.
problem = cvx.Problem(objective, constraints)
problem.solve(**self.SOLVER_PARAMS)
w = w.value
b_s = b_s.value
self.model_state = {
"w": w,
"b_s": b_s,
"w_priv": w_priv.value,
"d_priv": d_priv.value,
"lupi_features": lupi_features, # Number of lupi features in the dataset TODO: Move this somewhere else
"bin_boundaries": n_boundaries,
}
self.constraints = {
"loss": loss.value,
"w_l1": w_l1.value,
"w_priv_l1": w_priv_l1.value,
}
return self
def predict(self, X):
X, X_priv = split_dataset(X, self.lupi_features)
w = self.model_state["w"]
b_s = self.model_state["b_s"]
scores = np.dot(X, w.T)[np.newaxis]
bin_thresholds = np.append(b_s, np.inf)
# If thresholds are smaller than score the value belongs to the bigger bin
# after subtracting we check for positive elements
indices = np.sum(scores.T - bin_thresholds >= 0, -1)
return self.classes_[indices]
def score(self, X, y, error_type="mmae", return_error=False, **kwargs):
X, y = check_X_y(X, y)
prediction = self.predict(X)
score = ordinal_scores(y, prediction, error_type, return_error=return_error)
return score
def make_scorer(self):
# Use multiple scores for ordinal regression
mze = make_scorer(ordinal_scores, error_type="mze")
mae = make_scorer(ordinal_scores, error_type="mae")
mmae = make_scorer(ordinal_scores, error_type="mmae")
scorer = {"mze": mze, "mae": mae, "mmae": mmae}
return scorer, "mmae"
def get_bin_mapping(y):
"""
Get ordered unique classes and corresponding mapping from old names
Parameters
----------
y: array of discrete values (int, str)
Returns
-------
"""
classes_ = np.unique(y)
original_bins = sorted(classes_)
n_bins = len(original_bins)
bins = np.arange(n_bins)
get_old_bin = dict(zip(bins, original_bins))
return get_old_bin, n_bins
class LUPI_OrdinalRegression_Relevance_Bound(
LUPI_Relevance_CVXProblem, OrdinalRegression_Relevance_Bound
):
@classmethod
def generate_lower_bound_problem(
cls,
best_hyperparameters,
init_constraints,
best_model_state,
data,
di,
preset_model,
probeID=-1,
):
is_priv = is_lupi_feature(
di, data, best_model_state
) # Is it a lupi feature where we need additional candidate problems?
if not is_priv:
yield from super().generate_lower_bound_problem(
best_hyperparameters,
init_constraints,
best_model_state,
data,
di,
preset_model,
probeID=probeID,
)
else:
for sign in [1, -1]:
problem = cls(
di,
data,
best_hyperparameters,
init_constraints,
preset_model=preset_model,
best_model_state=best_model_state,
probeID=probeID,
)
problem.init_objective_LB(sign=sign)
problem.isLowerBound = True
yield problem
@classmethod
def generate_upper_bound_problem(
cls,
best_hyperparameters,
init_constraints,
best_model_state,
data,
di,
preset_model,
probeID=-1,
):
is_priv = is_lupi_feature(
di, data, best_model_state
) # Is it a lupi feature where we need additional candidate problems?
if not is_priv:
yield from super().generate_upper_bound_problem(
best_hyperparameters,
init_constraints,
best_model_state,
data,
di,
preset_model,
probeID=probeID,
)
else:
for sign, pos in product([1, -1], [0, 1]):
problem = cls(
di,
data,
best_hyperparameters,
init_constraints,
preset_model=preset_model,
best_model_state=best_model_state,
probeID=probeID,
)
problem.init_objective_UB(sign=sign, pos=pos)
yield problem
@classmethod
def aggregate_min_candidates(cls, min_problems_candidates):
vals = [candidate.solved_relevance for candidate in min_problems_candidates]
# We take the max of mins because we need the necessary contribution over all functions
min_value = max(vals)
return min_value
def _init_objective_LB_LUPI(self, sign=None, bin_index=None, **kwargs):
self.add_constraint(
sign * self.w_priv[self.lupi_index, :] <= self.feature_relevance
)
self._objective = cvx.Minimize(self.feature_relevance)
def _init_objective_UB_LUPI(self, sign=None, pos=None, **kwargs):
self.add_constraint(
self.feature_relevance <= sign * self.w_priv[self.lupi_index, pos]
)
self._objective = cvx.Maximize(self.feature_relevance)
def _init_constraints(self, parameters, init_model_constraints):
# Upper constraints from initial model
init_w_l1 = init_model_constraints["w_l1"]
init_w_priv_l1 = init_model_constraints["w_priv_l1"]
init_loss = init_model_constraints["loss"]
scaling_lupi_w = parameters["scaling_lupi_w"]
get_original_bin_name, n_bins = get_bin_mapping(self.y)
n_boundaries = n_bins - 1
# Initalize Variables in cvxpy
w = cvx.Variable(shape=(self.d), name="w")
b_s = cvx.Variable(shape=(n_boundaries), name="bias")
w_priv = cvx.Variable(shape=(self.d_priv, 2), name="w_priv")
d_priv = cvx.Variable(shape=(2), name="bias_priv")
def priv_function(bin, sign):
indices = np.where(self.y == get_original_bin_name[bin])
return self.X_priv[indices] @ w_priv[:, sign] + d_priv[sign]
# L1 norm regularization of both functions with 1 scaling constant
priv_l1_1 = cvx.norm(w_priv[:, 0], 1)
priv_l1_2 = cvx.norm(w_priv[:, 1], 1)
w_priv_l1 = priv_l1_1 + priv_l1_2
w_l1 = cvx.norm(w, 1)
loss = 0
for left_bin in range(0, n_bins - 1):
indices = np.where(self.y == get_original_bin_name[left_bin])
self.add_constraint(
self.X[indices] @ w - b_s[left_bin] <= -1 + priv_function(left_bin, 0)
)
self.add_constraint(priv_function(left_bin, 0) >= 0)
loss += cvx.sum(priv_function(left_bin, 0))
# Add constraints for slack into right neighboring bins
for right_bin in range(1, n_bins):
indices = np.where(self.y == get_original_bin_name[right_bin])
self.add_constraint(
self.X[indices] @ w - b_s[right_bin - 1]
>= +1 - priv_function(right_bin, 1)
)
self.add_constraint(priv_function(right_bin, 1) >= 0)
loss += cvx.sum(priv_function(right_bin, 1))
for i_boundary in range(0, n_boundaries - 1):
self.add_constraint(b_s[i_boundary] <= b_s[i_boundary + 1])
self.add_constraint(
w_l1 + scaling_lupi_w * w_priv_l1
<= init_w_l1 + scaling_lupi_w * init_w_priv_l1
)
self.add_constraint(loss <= init_loss)
self.w = w
self.w_priv = w_priv
self.feature_relevance = cvx.Variable(nonneg=True, name="Feature Relevance")
Functions
get_bin_mapping
def get_bin_mapping(
y
)
Get ordered unique classes and corresponding mapping from old names Parameters
y: array of discrete values (int, str)
Returns
View Source
def get_bin_mapping(y):
"""
Get ordered unique classes and corresponding mapping from old names
Parameters
----------
y: array of discrete values (int, str)
Returns
-------
"""
classes_ = np.unique(y)
original_bins = sorted(classes_)
n_bins = len(original_bins)
bins = np.arange(n_bins)
get_old_bin = dict(zip(bins, original_bins))
return get_old_bin, n_bins
Classes
LUPI_OrdinalRegression
class LUPI_OrdinalRegression(
**kwargs
)
Helper class that provides a standard way to create an ABC using inheritance.
View Source
class LUPI_OrdinalRegression(ProblemType):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._lupi_features = None
@property
def lupi_features(self):
return self._lupi_features
@classmethod
def parameters(cls):
return ["C", "scaling_lupi_w"]
@property
def get_initmodel_template(cls):
return LUPI_OrdinalRegression_SVM
@property
def get_cvxproblem_template(cls):
return LUPI_OrdinalRegression_Relevance_Bound
def relax_factors(cls):
return ["loss_slack", "w_l1_slack"]
def preprocessing(self, data, lupi_features=None):
X, y = data
d = X.shape[1]
if lupi_features is None:
raise ValueError("Argument 'lupi_features' missing in fit() call.")
if not isinstance(lupi_features, int):
raise ValueError("Argument 'lupi_features' is not type int.")
if not 0 < lupi_features < d:
raise ValueError(
"Argument 'lupi_features' looks wrong. We need at least 1 priviliged feature (>0) or at least one normal feature."
)
self._lupi_features = lupi_features
# Check that X and y have correct shape
X, y = check_X_y(X, y)
if np.min(y) > 0:
print("First ordinal class has index > 0. Shifting index...")
y = y - np.min(y)
return X, y
Ancestors (in MRO)
- fri.model.base_type.ProblemType
- abc.ABC
Static methods
parameters
def parameters(
)
View Source
@classmethod
def parameters(cls):
return ["C", "scaling_lupi_w"]
Instance variables
get_cvxproblem_template
get_initmodel_template
lupi_features
Methods
get_all_parameters
def get_all_parameters(
self
)
View Source
def get_all_parameters(self):
return {p: self.get_chosen_parameter(p) for p in self.parameters()}
get_all_relax_factors
def get_all_relax_factors(
self
)
View Source
def get_all_relax_factors(self):
return {p: self.get_chosen_relax_factors(p) for p in self.relax_factors()}
get_chosen_parameter
def get_chosen_parameter(
self,
p
)
View Source
def get_chosen_parameter(self, p):
try:
return [
self.chosen_parameters_[p]
] # We return list for param search function
except:
# # TODO: rewrite the parameter logic
# # TODO: move this to subclass
if p == "scaling_lupi_w":
# return [0.1, 1, 10, 100, 1000]
return scipy.stats.reciprocal(a=1e-15, b=1e10)
# if p == "scaling_lupi_loss":
# # value 0>p<1 causes standard svm solution
# # p>1 encourages usage of lupi function
# return scipy.stats.reciprocal(a=1e-15, b=1e15)
if p == "C":
return scipy.stats.reciprocal(a=1e-5, b=1e5)
if p == "epsilon":
return [0, 0.001, 0.01, 0.1, 1, 10, 100]
else:
return scipy.stats.reciprocal(a=1e-10, b=1e10)
get_chosen_relax_factors
def get_chosen_relax_factors(
self,
p
)
View Source
def get_chosen_relax_factors(self, p):
try:
factor = self.relax_factors_[p]
except KeyError:
try:
factor = self.relax_factors_[p + "_slack"]
except KeyError:
factor = 0.1
if factor < 0:
raise ValueError("Slack Factor multiplier is positive!")
return factor
get_relaxed_constraints
def get_relaxed_constraints(
self,
constraints
)
View Source
def get_relaxed_constraints(self, constraints):
return {c: self.relax_constraint(c, v) for c, v in constraints.items()}
postprocessing
def postprocessing(
self,
bounds
)
View Source
def postprocessing(self, bounds):
return bounds
preprocessing
def preprocessing(
self,
data,
lupi_features=None
)
View Source
def preprocessing(self, data, lupi_features=None):
X, y = data
d = X.shape[1]
if lupi_features is None:
raise ValueError("Argument 'lupi_features' missing in fit() call.")
if not isinstance(lupi_features, int):
raise ValueError("Argument 'lupi_features' is not type int.")
if not 0 < lupi_features < d:
raise ValueError(
"Argument 'lupi_features' looks wrong. We need at least 1 priviliged feature (>0) or at least one normal feature."
)
self._lupi_features = lupi_features
# Check that X and y have correct shape
X, y = check_X_y(X, y)
if np.min(y) > 0:
print("First ordinal class has index > 0. Shifting index...")
y = y - np.min(y)
return X, y
relax_constraint
def relax_constraint(
self,
key,
value
)
View Source
def relax_constraint(self, key, value):
return value * (1 + self.get_chosen_relax_factors(key))
relax_factors
def relax_factors(
cls
)
View Source
def relax_factors(cls):
return ["loss_slack", "w_l1_slack"]
LUPI_OrdinalRegression_Relevance_Bound
class LUPI_OrdinalRegression_Relevance_Bound(
current_feature: int,
data: tuple,
hyperparameters,
best_model_constraints,
preset_model=None,
best_model_state=None,
probeID=-1
)
Helper class that provides a standard way to create an ABC using inheritance.
View Source
class LUPI_OrdinalRegression_Relevance_Bound(
LUPI_Relevance_CVXProblem, OrdinalRegression_Relevance_Bound
):
@classmethod
def generate_lower_bound_problem(
cls,
best_hyperparameters,
init_constraints,
best_model_state,
data,
di,
preset_model,
probeID=-1,
):
is_priv = is_lupi_feature(
di, data, best_model_state
) # Is it a lupi feature where we need additional candidate problems?
if not is_priv:
yield from super().generate_lower_bound_problem(
best_hyperparameters,
init_constraints,
best_model_state,
data,
di,
preset_model,
probeID=probeID,
)
else:
for sign in [1, -1]:
problem = cls(
di,
data,
best_hyperparameters,
init_constraints,
preset_model=preset_model,
best_model_state=best_model_state,
probeID=probeID,
)
problem.init_objective_LB(sign=sign)
problem.isLowerBound = True
yield problem
@classmethod
def generate_upper_bound_problem(
cls,
best_hyperparameters,
init_constraints,
best_model_state,
data,
di,
preset_model,
probeID=-1,
):
is_priv = is_lupi_feature(
di, data, best_model_state
) # Is it a lupi feature where we need additional candidate problems?
if not is_priv:
yield from super().generate_upper_bound_problem(
best_hyperparameters,
init_constraints,
best_model_state,
data,
di,
preset_model,
probeID=probeID,
)
else:
for sign, pos in product([1, -1], [0, 1]):
problem = cls(
di,
data,
best_hyperparameters,
init_constraints,
preset_model=preset_model,
best_model_state=best_model_state,
probeID=probeID,
)
problem.init_objective_UB(sign=sign, pos=pos)
yield problem
@classmethod
def aggregate_min_candidates(cls, min_problems_candidates):
vals = [candidate.solved_relevance for candidate in min_problems_candidates]
# We take the max of mins because we need the necessary contribution over all functions
min_value = max(vals)
return min_value
def _init_objective_LB_LUPI(self, sign=None, bin_index=None, **kwargs):
self.add_constraint(
sign * self.w_priv[self.lupi_index, :] <= self.feature_relevance
)
self._objective = cvx.Minimize(self.feature_relevance)
def _init_objective_UB_LUPI(self, sign=None, pos=None, **kwargs):
self.add_constraint(
self.feature_relevance <= sign * self.w_priv[self.lupi_index, pos]
)
self._objective = cvx.Maximize(self.feature_relevance)
def _init_constraints(self, parameters, init_model_constraints):
# Upper constraints from initial model
init_w_l1 = init_model_constraints["w_l1"]
init_w_priv_l1 = init_model_constraints["w_priv_l1"]
init_loss = init_model_constraints["loss"]
scaling_lupi_w = parameters["scaling_lupi_w"]
get_original_bin_name, n_bins = get_bin_mapping(self.y)
n_boundaries = n_bins - 1
# Initalize Variables in cvxpy
w = cvx.Variable(shape=(self.d), name="w")
b_s = cvx.Variable(shape=(n_boundaries), name="bias")
w_priv = cvx.Variable(shape=(self.d_priv, 2), name="w_priv")
d_priv = cvx.Variable(shape=(2), name="bias_priv")
def priv_function(bin, sign):
indices = np.where(self.y == get_original_bin_name[bin])
return self.X_priv[indices] @ w_priv[:, sign] + d_priv[sign]
# L1 norm regularization of both functions with 1 scaling constant
priv_l1_1 = cvx.norm(w_priv[:, 0], 1)
priv_l1_2 = cvx.norm(w_priv[:, 1], 1)
w_priv_l1 = priv_l1_1 + priv_l1_2
w_l1 = cvx.norm(w, 1)
loss = 0
for left_bin in range(0, n_bins - 1):
indices = np.where(self.y == get_original_bin_name[left_bin])
self.add_constraint(
self.X[indices] @ w - b_s[left_bin] <= -1 + priv_function(left_bin, 0)
)
self.add_constraint(priv_function(left_bin, 0) >= 0)
loss += cvx.sum(priv_function(left_bin, 0))
# Add constraints for slack into right neighboring bins
for right_bin in range(1, n_bins):
indices = np.where(self.y == get_original_bin_name[right_bin])
self.add_constraint(
self.X[indices] @ w - b_s[right_bin - 1]
>= +1 - priv_function(right_bin, 1)
)
self.add_constraint(priv_function(right_bin, 1) >= 0)
loss += cvx.sum(priv_function(right_bin, 1))
for i_boundary in range(0, n_boundaries - 1):
self.add_constraint(b_s[i_boundary] <= b_s[i_boundary + 1])
self.add_constraint(
w_l1 + scaling_lupi_w * w_priv_l1
<= init_w_l1 + scaling_lupi_w * init_w_priv_l1
)
self.add_constraint(loss <= init_loss)
self.w = w
self.w_priv = w_priv
self.feature_relevance = cvx.Variable(nonneg=True, name="Feature Relevance")
Ancestors (in MRO)
- fri.model.base_lupi.LUPI_Relevance_CVXProblem
- fri.model.ordinal_regression.OrdinalRegression_Relevance_Bound
- fri.model.base_cvxproblem.Relevance_CVXProblem
- abc.ABC
Static methods
aggregate_max_candidates
def aggregate_max_candidates(
max_problems_candidates
)
View Source
@classmethod
def aggregate_max_candidates(cls, max_problems_candidates):
vals = [candidate.solved_relevance for candidate in max_problems_candidates]
max_value = max(vals)
return max_value
aggregate_min_candidates
def aggregate_min_candidates(
min_problems_candidates
)
View Source
@classmethod
def aggregate_min_candidates(cls, min_problems_candidates):
vals = [candidate.solved_relevance for candidate in min_problems_candidates]
# We take the max of mins because we need the necessary contribution over all functions
min_value = max(vals)
return min_value
generate_lower_bound_problem
def generate_lower_bound_problem(
best_hyperparameters,
init_constraints,
best_model_state,
data,
di,
preset_model,
probeID=-1
)
View Source
@classmethod
def generate_lower_bound_problem(
cls,
best_hyperparameters,
init_constraints,
best_model_state,
data,
di,
preset_model,
probeID=-1,
):
is_priv = is_lupi_feature(
di, data, best_model_state
) # Is it a lupi feature where we need additional candidate problems?
if not is_priv:
yield from super().generate_lower_bound_problem(
best_hyperparameters,
init_constraints,
best_model_state,
data,
di,
preset_model,
probeID=probeID,
)
else:
for sign in [1, -1]:
problem = cls(
di,
data,
best_hyperparameters,
init_constraints,
preset_model=preset_model,
best_model_state=best_model_state,
probeID=probeID,
)
problem.init_objective_LB(sign=sign)
problem.isLowerBound = True
yield problem
generate_upper_bound_problem
def generate_upper_bound_problem(
best_hyperparameters,
init_constraints,
best_model_state,
data,
di,
preset_model,
probeID=-1
)
View Source
@classmethod
def generate_upper_bound_problem(
cls,
best_hyperparameters,
init_constraints,
best_model_state,
data,
di,
preset_model,
probeID=-1,
):
is_priv = is_lupi_feature(
di, data, best_model_state
) # Is it a lupi feature where we need additional candidate problems?
if not is_priv:
yield from super().generate_upper_bound_problem(
best_hyperparameters,
init_constraints,
best_model_state,
data,
di,
preset_model,
probeID=probeID,
)
else:
for sign, pos in product([1, -1], [0, 1]):
problem = cls(
di,
data,
best_hyperparameters,
init_constraints,
preset_model=preset_model,
best_model_state=best_model_state,
probeID=probeID,
)
problem.init_objective_UB(sign=sign, pos=pos)
yield problem
Instance variables
accepted_status
constraints
cvx_problem
isProbe
is_solved
objective
probeID
solved_relevance
solver_kwargs
Methods
add_constraint
def add_constraint(
self,
new
)
View Source
def add_constraint(self, new):
self._constraints.append(new)
init_objective_LB
def init_objective_LB(
self,
**kwargs
)
View Source
def init_objective_LB(self, **kwargs):
# We have two models basically with different indexes
if self.isPriv:
self._init_objective_LB_LUPI(**kwargs)
else:
# We call sibling class of our lupi class, which is the normal problem
super().init_objective_LB(**kwargs)
init_objective_UB
def init_objective_UB(
self,
**kwargs
)
View Source
def init_objective_UB(self, **kwargs):
# We have two models basically with different indexes
if self.isPriv:
self._init_objective_UB_LUPI(**kwargs)
else:
# We call sibling class of our lupi class, which is the normal problem
super().init_objective_UB(**kwargs)
preprocessing_data
def preprocessing_data(
self,
data,
best_model_state
)
View Source
def preprocessing_data(self, data, best_model_state):
lupi_features = best_model_state["lupi_features"]
X_combined, y = data
X, X_priv = split_dataset(X_combined, lupi_features)
self.X_priv = X_priv
super().preprocessing_data((X, y), best_model_state)
assert lupi_features == X_priv.shape[1]
self.d_priv = lupi_features
# LUPI model, we need to offset the index
self.lupi_index = self.current_feature - self.d
if self.lupi_index >= 0:
self.isPriv = True
else:
self.isPriv = False
solve
def solve(
self
) -> object
View Source
def solve(self) -> object:
# We init cvx problem here because pickling LP solver objects is problematic
# by deferring it to here, worker threads do the problem building themselves and we spare the serialization
self._cvx_problem = cvx.Problem(
objective=self.objective, constraints=self.constraints
)
try:
# print("Solve", self)
self._cvx_problem.solve(**self.solver_kwargs)
except SolverError:
# We ignore Solver Errors, which are common with our framework:
# We solve multiple problems per bound and choose a feasible solution later (see '_create_interval')
pass
self._solver_status = self._cvx_problem.status
# self._cvx_problem = None
return self
LUPI_OrdinalRegression_SVM
class LUPI_OrdinalRegression_SVM(
C=1,
scaling_lupi_w=1,
lupi_features=None
)
Helper class that provides a standard way to create an ABC using inheritance.
View Source
class LUPI_OrdinalRegression_SVM(LUPI_InitModel):
HYPERPARAMETER = ["C", "scaling_lupi_w"]
def __init__(self, C=1, scaling_lupi_w=1, lupi_features=None):
super().__init__()
self.scaling_lupi_w = scaling_lupi_w
self.C = C
self.lupi_features = lupi_features
def fit(self, X_combined, y, lupi_features=None):
"""
Parameters
----------
lupi_features : int
Number of features in dataset which are considered privileged information (PI).
PI features are expected to be the last features in the dataset.
"""
if lupi_features is None:
try:
lupi_features = self.lupi_features
self.lupi_features = lupi_features
except:
raise ValueError("No amount of lupi features given.")
X, X_priv = split_dataset(X_combined, self.lupi_features)
(n, d) = X.shape
self.classes_ = np.unique(y)
# Get parameters from CV model without any feature contstraints
C = self.get_params()["C"]
scaling_lupi_w = self.get_params()["scaling_lupi_w"]
get_original_bin_name, n_bins = get_bin_mapping(y)
n_boundaries = n_bins - 1
# Initalize Variables in cvxpy
w = cvx.Variable(shape=(d), name="w")
b_s = cvx.Variable(shape=(n_boundaries), name="bias")
w_priv = cvx.Variable(shape=(self.lupi_features, 2), name="w_priv")
d_priv = cvx.Variable(shape=(2), name="bias_priv")
def priv_function(bin, sign):
indices = np.where(y == get_original_bin_name[bin])
return X_priv[indices] @ w_priv[:, sign] + d_priv[sign]
# L1 norm regularization of both functions with 1 scaling constant
priv_l1_1 = cvx.norm(w_priv[:, 0], 1)
priv_l1_2 = cvx.norm(w_priv[:, 1], 1)
w_priv_l1 = priv_l1_1 + priv_l1_2
w_l1 = cvx.norm(w, 1)
weight_regularization = 0.5 * (w_l1 + scaling_lupi_w * w_priv_l1)
constraints = []
loss = 0
for left_bin in range(0, n_bins - 1):
indices = np.where(y == get_original_bin_name[left_bin])
constraints.append(
X[indices] @ w - b_s[left_bin] <= -1 + priv_function(left_bin, 0)
)
constraints.append(priv_function(left_bin, 0) >= 0)
loss += cvx.sum(priv_function(left_bin, 0))
# Add constraints for slack into right neighboring bins
for right_bin in range(1, n_bins):
indices = np.where(y == get_original_bin_name[right_bin])
constraints.append(
X[indices] @ w - b_s[right_bin - 1] >= +1 - priv_function(right_bin, 1)
)
constraints.append(priv_function(right_bin, 1) >= 0)
loss += cvx.sum(priv_function(right_bin, 1))
for i_boundary in range(0, n_boundaries - 1):
constraints.append(b_s[i_boundary] <= b_s[i_boundary + 1])
objective = cvx.Minimize(C * loss + weight_regularization)
# Solve problem.
problem = cvx.Problem(objective, constraints)
problem.solve(**self.SOLVER_PARAMS)
w = w.value
b_s = b_s.value
self.model_state = {
"w": w,
"b_s": b_s,
"w_priv": w_priv.value,
"d_priv": d_priv.value,
"lupi_features": lupi_features, # Number of lupi features in the dataset TODO: Move this somewhere else
"bin_boundaries": n_boundaries,
}
self.constraints = {
"loss": loss.value,
"w_l1": w_l1.value,
"w_priv_l1": w_priv_l1.value,
}
return self
def predict(self, X):
X, X_priv = split_dataset(X, self.lupi_features)
w = self.model_state["w"]
b_s = self.model_state["b_s"]
scores = np.dot(X, w.T)[np.newaxis]
bin_thresholds = np.append(b_s, np.inf)
# If thresholds are smaller than score the value belongs to the bigger bin
# after subtracting we check for positive elements
indices = np.sum(scores.T - bin_thresholds >= 0, -1)
return self.classes_[indices]
def score(self, X, y, error_type="mmae", return_error=False, **kwargs):
X, y = check_X_y(X, y)
prediction = self.predict(X)
score = ordinal_scores(y, prediction, error_type, return_error=return_error)
return score
def make_scorer(self):
# Use multiple scores for ordinal regression
mze = make_scorer(ordinal_scores, error_type="mze")
mae = make_scorer(ordinal_scores, error_type="mae")
mmae = make_scorer(ordinal_scores, error_type="mmae")
scorer = {"mze": mze, "mae": mae, "mmae": mmae}
return scorer, "mmae"
Ancestors (in MRO)
- fri.model.base_initmodel.LUPI_InitModel
- fri.model.base_initmodel.InitModel
- abc.ABC
- sklearn.base.BaseEstimator
Class variables
HYPERPARAMETER
SOLVER_PARAMS
Instance variables
L1_factor
L1_factor_priv
Methods
fit
def fit(
self,
X_combined,
y,
lupi_features=None
)
Parameters
lupi_features : int Number of features in dataset which are considered privileged information (PI). PI features are expected to be the last features in the dataset.
View Source
def fit(self, X_combined, y, lupi_features=None):
"""
Parameters
----------
lupi_features : int
Number of features in dataset which are considered privileged information (PI).
PI features are expected to be the last features in the dataset.
"""
if lupi_features is None:
try:
lupi_features = self.lupi_features
self.lupi_features = lupi_features
except:
raise ValueError("No amount of lupi features given.")
X, X_priv = split_dataset(X_combined, self.lupi_features)
(n, d) = X.shape
self.classes_ = np.unique(y)
# Get parameters from CV model without any feature contstraints
C = self.get_params()["C"]
scaling_lupi_w = self.get_params()["scaling_lupi_w"]
get_original_bin_name, n_bins = get_bin_mapping(y)
n_boundaries = n_bins - 1
# Initalize Variables in cvxpy
w = cvx.Variable(shape=(d), name="w")
b_s = cvx.Variable(shape=(n_boundaries), name="bias")
w_priv = cvx.Variable(shape=(self.lupi_features, 2), name="w_priv")
d_priv = cvx.Variable(shape=(2), name="bias_priv")
def priv_function(bin, sign):
indices = np.where(y == get_original_bin_name[bin])
return X_priv[indices] @ w_priv[:, sign] + d_priv[sign]
# L1 norm regularization of both functions with 1 scaling constant
priv_l1_1 = cvx.norm(w_priv[:, 0], 1)
priv_l1_2 = cvx.norm(w_priv[:, 1], 1)
w_priv_l1 = priv_l1_1 + priv_l1_2
w_l1 = cvx.norm(w, 1)
weight_regularization = 0.5 * (w_l1 + scaling_lupi_w * w_priv_l1)
constraints = []
loss = 0
for left_bin in range(0, n_bins - 1):
indices = np.where(y == get_original_bin_name[left_bin])
constraints.append(
X[indices] @ w - b_s[left_bin] <= -1 + priv_function(left_bin, 0)
)
constraints.append(priv_function(left_bin, 0) >= 0)
loss += cvx.sum(priv_function(left_bin, 0))
# Add constraints for slack into right neighboring bins
for right_bin in range(1, n_bins):
indices = np.where(y == get_original_bin_name[right_bin])
constraints.append(
X[indices] @ w - b_s[right_bin - 1] >= +1 - priv_function(right_bin, 1)
)
constraints.append(priv_function(right_bin, 1) >= 0)
loss += cvx.sum(priv_function(right_bin, 1))
for i_boundary in range(0, n_boundaries - 1):
constraints.append(b_s[i_boundary] <= b_s[i_boundary + 1])
objective = cvx.Minimize(C * loss + weight_regularization)
# Solve problem.
problem = cvx.Problem(objective, constraints)
problem.solve(**self.SOLVER_PARAMS)
w = w.value
b_s = b_s.value
self.model_state = {
"w": w,
"b_s": b_s,
"w_priv": w_priv.value,
"d_priv": d_priv.value,
"lupi_features": lupi_features, # Number of lupi features in the dataset TODO: Move this somewhere else
"bin_boundaries": n_boundaries,
}
self.constraints = {
"loss": loss.value,
"w_l1": w_l1.value,
"w_priv_l1": w_priv_l1.value,
}
return self
get_params
def get_params(
self,
deep=True
)
Get parameters for this estimator.
Parameters
deep : bool, default=True If True, will return the parameters for this estimator and contained subobjects that are estimators.
Returns
params : dict Parameter names mapped to their values.
View Source
def get_params(self, deep=True):
"""
Get parameters for this estimator.
Parameters
----------
deep : bool, default=True
If True, will return the parameters for this estimator and
contained subobjects that are estimators.
Returns
-------
params : dict
Parameter names mapped to their values.
"""
out = dict()
for key in self._get_param_names():
value = getattr(self, key)
if deep and hasattr(value, 'get_params'):
deep_items = value.get_params().items()
out.update((key + '__' + k, val) for k, val in deep_items)
out[key] = value
return out
make_scorer
def make_scorer(
self
)
View Source
def make_scorer(self):
# Use multiple scores for ordinal regression
mze = make_scorer(ordinal_scores, error_type="mze")
mae = make_scorer(ordinal_scores, error_type="mae")
mmae = make_scorer(ordinal_scores, error_type="mmae")
scorer = {"mze": mze, "mae": mae, "mmae": mmae}
return scorer, "mmae"
predict
def predict(
self,
X
)
View Source
def predict(self, X):
X, X_priv = split_dataset(X, self.lupi_features)
w = self.model_state["w"]
b_s = self.model_state["b_s"]
scores = np.dot(X, w.T)[np.newaxis]
bin_thresholds = np.append(b_s, np.inf)
# If thresholds are smaller than score the value belongs to the bigger bin
# after subtracting we check for positive elements
indices = np.sum(scores.T - bin_thresholds >= 0, -1)
return self.classes_[indices]
score
def score(
self,
X,
y,
error_type='mmae',
return_error=False,
**kwargs
)
View Source
def score(self, X, y, error_type="mmae", return_error=False, **kwargs):
X, y = check_X_y(X, y)
prediction = self.predict(X)
score = ordinal_scores(y, prediction, error_type, return_error=return_error)
return score
set_params
def set_params(
self,
**params
)
Set the parameters of this estimator.
The method works on simple estimators as well as on nested objects
(such as :class:~sklearn.pipeline.Pipeline
). The latter have
parameters of the form <component>__<parameter>
so that it's
possible to update each component of a nested object.
Parameters
**params : dict Estimator parameters.
Returns
self : estimator instance Estimator instance.
View Source
def set_params(self, **params):
"""
Set the parameters of this estimator.
The method works on simple estimators as well as on nested objects
(such as :class:`~sklearn.pipeline.Pipeline`). The latter have
parameters of the form ``<component>__<parameter>`` so that it's
possible to update each component of a nested object.
Parameters
----------
**params : dict
Estimator parameters.
Returns
-------
self : estimator instance
Estimator instance.
"""
if not params:
# Simple optimization to gain speed (inspect is slow)
return self
valid_params = self.get_params(deep=True)
nested_params = defaultdict(dict) # grouped by prefix
for key, value in params.items():
key, delim, sub_key = key.partition('__')
if key not in valid_params:
raise ValueError('Invalid parameter %s for estimator %s. '
'Check the list of available parameters '
'with `estimator.get_params().keys()`.' %
(key, self))
if delim:
nested_params[key][sub_key] = value
else:
setattr(self, key, value)
valid_params[key] = value
for key, sub_params in nested_params.items():
valid_params[key].set_params(**sub_params)
return self