Module fri.model.lupi_regression
View Source
from itertools import product
import cvxpy as cvx
import numpy as np
from sklearn.metrics import r2_score
from sklearn.utils import check_X_y
from fri.model.base_lupi import (
LUPI_Relevance_CVXProblem,
split_dataset,
is_lupi_feature,
)
from fri.model.regression import Regression_Relevance_Bound
from .base_initmodel import LUPI_InitModel
from .base_type import ProblemType
class LUPI_Regression(ProblemType):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._lupi_features = None
@property
def lupi_features(self):
return self._lupi_features
@classmethod
def parameters(cls):
return ["C", "epsilon", "scaling_lupi_w", "scaling_lupi_loss"]
@property
def get_initmodel_template(cls):
return LUPI_Regression_SVM
@property
def get_cvxproblem_template(cls):
return LUPI_Regression_Relevance_Bound
def relax_factors(cls):
return ["loss_slack", "w_l1_slack"]
def preprocessing(self, data, lupi_features=None):
X, y = data
d = X.shape[1]
if lupi_features is None:
raise ValueError("Argument 'lupi_features' missing in fit() call.")
if not isinstance(lupi_features, int):
raise ValueError("Argument 'lupi_features' is not type int.")
if not 0 < lupi_features < d:
raise ValueError(
"Argument 'lupi_features' looks wrong. We need at least 1 priviliged feature (>0) or at least one normal feature."
)
self._lupi_features = lupi_features
# Check that X and y have correct shape
X, y = check_X_y(X, y)
return X, y
class LUPI_Regression_SVM(LUPI_InitModel):
HYPERPARAMETER = ["C", "epsilon", "scaling_lupi_w", "scaling_lupi_loss"]
def __init__(
self,
C=1,
epsilon=0.1,
scaling_lupi_w=1,
scaling_lupi_loss=1,
lupi_features=None,
):
super().__init__()
self.epsilon = epsilon
self.scaling_lupi_loss = scaling_lupi_loss
self.scaling_lupi_w = scaling_lupi_w
self.C = C
self.lupi_features = lupi_features
def fit(self, X_combined, y, lupi_features=None):
"""
Parameters
----------
lupi_features : int
Number of features in dataset which are considered privileged information (PI).
PI features are expected to be the last features in the dataset.
"""
if lupi_features is None:
try:
lupi_features = self.lupi_features
self.lupi_features = lupi_features
except:
raise ValueError("No amount of lupi features given.")
X, X_priv = split_dataset(X_combined, self.lupi_features)
(n, d) = X.shape
# Get parameters from CV model without any feature contstraints
C = self.get_params()["C"]
epsilon = self.get_params()["epsilon"]
scaling_lupi_w = self.get_params()["scaling_lupi_w"]
scaling_lupi_loss = self.get_params()["scaling_lupi_loss"]
# Initalize Variables in cvxpy
w = cvx.Variable(shape=(d), name="w")
b = cvx.Variable(name="bias")
w_priv_pos = cvx.Variable(lupi_features, name="w_priv_pos")
b_priv_pos = cvx.Variable(name="bias_priv_pos")
w_priv_neg = cvx.Variable(lupi_features, name="w_priv_neg")
b_priv_neg = cvx.Variable(name="bias_priv_neg")
slack = cvx.Variable(shape=(n), name="slack")
# Define functions for better readability
priv_function_pos = X_priv @ w_priv_pos + b_priv_pos
priv_function_neg = X_priv @ w_priv_neg + b_priv_neg
# Combined loss of lupi function and normal slacks, scaled by two constants
priv_loss_pos = cvx.sum(priv_function_pos)
priv_loss_neg = cvx.sum(priv_function_neg)
priv_loss = priv_loss_pos + priv_loss_neg
slack_loss = cvx.sum(slack)
loss = scaling_lupi_loss * priv_loss + slack_loss
# L1 norm regularization of both functions with 1 scaling constant
weight_regularization = 0.5 * (
cvx.norm(w, 1)
+ scaling_lupi_w
* (0.5 * cvx.norm(w_priv_pos, 1) + 0.5 * cvx.norm(w_priv_neg, 1))
)
constraints = [
y - X @ w - b <= epsilon + priv_function_pos + slack,
X @ w + b - y <= epsilon + priv_function_neg + slack,
priv_function_pos >= 0,
priv_function_neg >= 0,
# priv_loss_pos >= 0,
# priv_loss_neg >= 0,
# slack_loss >= 0,
slack >= 0,
# loss >= 0,
]
objective = cvx.Minimize(C * loss + weight_regularization)
# Solve problem.
problem = cvx.Problem(objective, constraints)
problem.solve(**self.SOLVER_PARAMS)
self.model_state = {
"signs_pos": priv_function_pos.value > 0,
"signs_neg": priv_function_neg.value > 0,
"w": w.value,
"w_priv_pos": w_priv_pos.value,
"w_priv_neg": w_priv_neg.value,
"b": b.value,
"b_priv_pos": b_priv_pos.value,
"b_priv_neg": b_priv_neg.value,
"lupi_features": lupi_features, # Number of lupi features in the dataset TODO: Move this somewhere else,
}
w_l1 = np.linalg.norm(w.value, ord=1)
w_priv_pos_l1 = np.linalg.norm(w_priv_pos.value, ord=1)
w_priv_neg_l1 = np.linalg.norm(w_priv_neg.value, ord=1)
# We take the mean to combine all submodels (for priv) into a single normalization factor
w_priv_l1 = w_priv_pos_l1 + w_priv_neg_l1
self.constraints = {
"priv_loss": priv_loss.value,
"scaling_lupi_loss": scaling_lupi_loss,
# "loss_slack": slack_loss.value,
"loss": loss.value,
"w_l1": w_l1,
"w_priv_l1": w_priv_l1,
"w_priv_pos_l1": w_priv_pos_l1,
"w_priv_neg_l1": w_priv_neg_l1,
}
return self
@property
def SOLVER_PARAMS(cls):
return {"solver": "ECOS", "verbose": False}
def predict(self, X):
"""
Method to predict points using svm classification rule.
We use both normal and priv. features.
This function is mainly used for CV purposes to find the best parameters according to score.
Parameters
----------
X : numpy.ndarray
"""
X, X_priv = split_dataset(X, self.lupi_features)
w = self.model_state["w"]
b = self.model_state["b"]
y = np.dot(X, w) + b
return y
def score(self, X, y, **kwargs):
prediction = self.predict(X)
score = r2_score(y, prediction)
return score
class LUPI_Regression_Relevance_Bound(
LUPI_Relevance_CVXProblem, Regression_Relevance_Bound
):
@classmethod
def generate_upper_bound_problem(
cls,
best_hyperparameters,
init_constraints,
best_model_state,
data,
di,
preset_model,
probeID=-1,
):
is_priv = is_lupi_feature(
di, data, best_model_state
) # Is it a lupi feature where we need additional candidate problems?
if not is_priv:
yield from super().generate_upper_bound_problem(
best_hyperparameters,
init_constraints,
best_model_state,
data,
di,
preset_model,
probeID=probeID,
)
else:
for sign, pos in product([1, -1], [True, False]):
problem = cls(
di,
data,
best_hyperparameters,
init_constraints,
preset_model=preset_model,
best_model_state=best_model_state,
probeID=probeID,
)
problem.init_objective_UB(sign=sign, pos=pos)
yield problem
def _init_objective_LB_LUPI(self, **kwargs):
self.add_constraint(
cvx.abs(self.w_priv_pos[self.lupi_index]) <= self.feature_relevance
)
self.add_constraint(
cvx.abs(self.w_priv_neg[self.lupi_index]) <= self.feature_relevance
)
self._objective = cvx.Minimize(self.feature_relevance)
def _init_objective_UB_LUPI(self, pos=None, sign=None, **kwargs):
if pos:
self.add_constraint(
self.feature_relevance <= sign * self.w_priv_pos[self.lupi_index]
)
else:
self.add_constraint(
self.feature_relevance <= sign * self.w_priv_neg[self.lupi_index]
)
self._objective = cvx.Maximize(self.feature_relevance)
def _init_constraints(self, parameters, init_model_constraints):
# Upper constraints from best initial model
l1_w = init_model_constraints["w_l1"]
self.l1_priv_w_pos = init_model_constraints["w_priv_pos_l1"]
self.l1_priv_w_neg = init_model_constraints["w_priv_neg_l1"]
init_loss = init_model_constraints["loss"]
epsilon = parameters["epsilon"]
scaling_lupi_loss = init_model_constraints["scaling_lupi_loss"]
# New Variables
w = cvx.Variable(shape=(self.d), name="w")
b = cvx.Variable(name="b")
w_priv_pos = cvx.Variable(self.d_priv, name="w_priv_pos")
b_priv_pos = cvx.Variable(name="bias_priv_pos")
w_priv_neg = cvx.Variable(self.d_priv, name="w_priv_neg")
b_priv_neg = cvx.Variable(name="bias_priv_neg")
slack = cvx.Variable(shape=(self.n))
priv_function_pos = self.X_priv @ w_priv_pos + b_priv_pos
priv_function_neg = self.X_priv @ w_priv_neg + b_priv_neg
priv_loss = cvx.sum(priv_function_pos + priv_function_neg)
loss = priv_loss + cvx.sum(slack)
weight_norm = cvx.norm(w, 1)
self.weight_norm_priv_pos = cvx.norm(w_priv_pos, 1)
self.weight_norm_priv_neg = cvx.norm(w_priv_neg, 1)
self.add_constraint(
self.y - self.X @ w - b <= epsilon + priv_function_pos + slack
)
self.add_constraint(
self.X @ w + b - self.y <= epsilon + priv_function_neg + slack
)
self.add_constraint(priv_function_pos >= 0)
self.add_constraint(priv_function_neg >= 0)
self.add_constraint(loss <= init_loss)
self.add_constraint(slack >= 0)
sum_norms = weight_norm + self.weight_norm_priv_pos + self.weight_norm_priv_neg
self.add_constraint(sum_norms <= l1_w)
# self.add_constraint(self.weight_norm_priv_pos <= self.l1_priv_w_pos)
# self.add_constraint(self.weight_norm_priv_neg <= self.l1_priv_w_neg)
# Save values for object use later
self.w = w
self.w_priv_pos = w_priv_pos
self.w_priv_neg = w_priv_neg
self.feature_relevance = cvx.Variable(nonneg=True, name="Feature Relevance")
Classes
LUPI_Regression
class LUPI_Regression(
**kwargs
)
Helper class that provides a standard way to create an ABC using inheritance.
View Source
class LUPI_Regression(ProblemType):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._lupi_features = None
@property
def lupi_features(self):
return self._lupi_features
@classmethod
def parameters(cls):
return ["C", "epsilon", "scaling_lupi_w", "scaling_lupi_loss"]
@property
def get_initmodel_template(cls):
return LUPI_Regression_SVM
@property
def get_cvxproblem_template(cls):
return LUPI_Regression_Relevance_Bound
def relax_factors(cls):
return ["loss_slack", "w_l1_slack"]
def preprocessing(self, data, lupi_features=None):
X, y = data
d = X.shape[1]
if lupi_features is None:
raise ValueError("Argument 'lupi_features' missing in fit() call.")
if not isinstance(lupi_features, int):
raise ValueError("Argument 'lupi_features' is not type int.")
if not 0 < lupi_features < d:
raise ValueError(
"Argument 'lupi_features' looks wrong. We need at least 1 priviliged feature (>0) or at least one normal feature."
)
self._lupi_features = lupi_features
# Check that X and y have correct shape
X, y = check_X_y(X, y)
return X, y
Ancestors (in MRO)
- fri.model.base_type.ProblemType
- abc.ABC
Static methods
parameters
def parameters(
)
View Source
@classmethod
def parameters(cls):
return ["C", "epsilon", "scaling_lupi_w", "scaling_lupi_loss"]
Instance variables
get_cvxproblem_template
get_initmodel_template
lupi_features
Methods
get_all_parameters
def get_all_parameters(
self
)
View Source
def get_all_parameters(self):
return {p: self.get_chosen_parameter(p) for p in self.parameters()}
get_all_relax_factors
def get_all_relax_factors(
self
)
View Source
def get_all_relax_factors(self):
return {p: self.get_chosen_relax_factors(p) for p in self.relax_factors()}
get_chosen_parameter
def get_chosen_parameter(
self,
p
)
View Source
def get_chosen_parameter(self, p):
try:
return [
self.chosen_parameters_[p]
] # We return list for param search function
except:
# # TODO: rewrite the parameter logic
# # TODO: move this to subclass
if p == "scaling_lupi_w":
# return [0.1, 1, 10, 100, 1000]
return scipy.stats.reciprocal(a=1e-15, b=1e10)
# if p == "scaling_lupi_loss":
# # value 0>p<1 causes standard svm solution
# # p>1 encourages usage of lupi function
# return scipy.stats.reciprocal(a=1e-15, b=1e15)
if p == "C":
return scipy.stats.reciprocal(a=1e-5, b=1e5)
if p == "epsilon":
return [0, 0.001, 0.01, 0.1, 1, 10, 100]
else:
return scipy.stats.reciprocal(a=1e-10, b=1e10)
get_chosen_relax_factors
def get_chosen_relax_factors(
self,
p
)
View Source
def get_chosen_relax_factors(self, p):
try:
factor = self.relax_factors_[p]
except KeyError:
try:
factor = self.relax_factors_[p + "_slack"]
except KeyError:
factor = 0.1
if factor < 0:
raise ValueError("Slack Factor multiplier is positive!")
return factor
get_relaxed_constraints
def get_relaxed_constraints(
self,
constraints
)
View Source
def get_relaxed_constraints(self, constraints):
return {c: self.relax_constraint(c, v) for c, v in constraints.items()}
postprocessing
def postprocessing(
self,
bounds
)
View Source
def postprocessing(self, bounds):
return bounds
preprocessing
def preprocessing(
self,
data,
lupi_features=None
)
View Source
def preprocessing(self, data, lupi_features=None):
X, y = data
d = X.shape[1]
if lupi_features is None:
raise ValueError("Argument 'lupi_features' missing in fit() call.")
if not isinstance(lupi_features, int):
raise ValueError("Argument 'lupi_features' is not type int.")
if not 0 < lupi_features < d:
raise ValueError(
"Argument 'lupi_features' looks wrong. We need at least 1 priviliged feature (>0) or at least one normal feature."
)
self._lupi_features = lupi_features
# Check that X and y have correct shape
X, y = check_X_y(X, y)
return X, y
relax_constraint
def relax_constraint(
self,
key,
value
)
View Source
def relax_constraint(self, key, value):
return value * (1 + self.get_chosen_relax_factors(key))
relax_factors
def relax_factors(
cls
)
View Source
def relax_factors(cls):
return ["loss_slack", "w_l1_slack"]
LUPI_Regression_Relevance_Bound
class LUPI_Regression_Relevance_Bound(
current_feature: int,
data: tuple,
hyperparameters,
best_model_constraints,
preset_model=None,
best_model_state=None,
probeID=-1
)
Helper class that provides a standard way to create an ABC using inheritance.
View Source
class LUPI_Regression_Relevance_Bound(
LUPI_Relevance_CVXProblem, Regression_Relevance_Bound
):
@classmethod
def generate_upper_bound_problem(
cls,
best_hyperparameters,
init_constraints,
best_model_state,
data,
di,
preset_model,
probeID=-1,
):
is_priv = is_lupi_feature(
di, data, best_model_state
) # Is it a lupi feature where we need additional candidate problems?
if not is_priv:
yield from super().generate_upper_bound_problem(
best_hyperparameters,
init_constraints,
best_model_state,
data,
di,
preset_model,
probeID=probeID,
)
else:
for sign, pos in product([1, -1], [True, False]):
problem = cls(
di,
data,
best_hyperparameters,
init_constraints,
preset_model=preset_model,
best_model_state=best_model_state,
probeID=probeID,
)
problem.init_objective_UB(sign=sign, pos=pos)
yield problem
def _init_objective_LB_LUPI(self, **kwargs):
self.add_constraint(
cvx.abs(self.w_priv_pos[self.lupi_index]) <= self.feature_relevance
)
self.add_constraint(
cvx.abs(self.w_priv_neg[self.lupi_index]) <= self.feature_relevance
)
self._objective = cvx.Minimize(self.feature_relevance)
def _init_objective_UB_LUPI(self, pos=None, sign=None, **kwargs):
if pos:
self.add_constraint(
self.feature_relevance <= sign * self.w_priv_pos[self.lupi_index]
)
else:
self.add_constraint(
self.feature_relevance <= sign * self.w_priv_neg[self.lupi_index]
)
self._objective = cvx.Maximize(self.feature_relevance)
def _init_constraints(self, parameters, init_model_constraints):
# Upper constraints from best initial model
l1_w = init_model_constraints["w_l1"]
self.l1_priv_w_pos = init_model_constraints["w_priv_pos_l1"]
self.l1_priv_w_neg = init_model_constraints["w_priv_neg_l1"]
init_loss = init_model_constraints["loss"]
epsilon = parameters["epsilon"]
scaling_lupi_loss = init_model_constraints["scaling_lupi_loss"]
# New Variables
w = cvx.Variable(shape=(self.d), name="w")
b = cvx.Variable(name="b")
w_priv_pos = cvx.Variable(self.d_priv, name="w_priv_pos")
b_priv_pos = cvx.Variable(name="bias_priv_pos")
w_priv_neg = cvx.Variable(self.d_priv, name="w_priv_neg")
b_priv_neg = cvx.Variable(name="bias_priv_neg")
slack = cvx.Variable(shape=(self.n))
priv_function_pos = self.X_priv @ w_priv_pos + b_priv_pos
priv_function_neg = self.X_priv @ w_priv_neg + b_priv_neg
priv_loss = cvx.sum(priv_function_pos + priv_function_neg)
loss = priv_loss + cvx.sum(slack)
weight_norm = cvx.norm(w, 1)
self.weight_norm_priv_pos = cvx.norm(w_priv_pos, 1)
self.weight_norm_priv_neg = cvx.norm(w_priv_neg, 1)
self.add_constraint(
self.y - self.X @ w - b <= epsilon + priv_function_pos + slack
)
self.add_constraint(
self.X @ w + b - self.y <= epsilon + priv_function_neg + slack
)
self.add_constraint(priv_function_pos >= 0)
self.add_constraint(priv_function_neg >= 0)
self.add_constraint(loss <= init_loss)
self.add_constraint(slack >= 0)
sum_norms = weight_norm + self.weight_norm_priv_pos + self.weight_norm_priv_neg
self.add_constraint(sum_norms <= l1_w)
# self.add_constraint(self.weight_norm_priv_pos <= self.l1_priv_w_pos)
# self.add_constraint(self.weight_norm_priv_neg <= self.l1_priv_w_neg)
# Save values for object use later
self.w = w
self.w_priv_pos = w_priv_pos
self.w_priv_neg = w_priv_neg
self.feature_relevance = cvx.Variable(nonneg=True, name="Feature Relevance")
Ancestors (in MRO)
- fri.model.base_lupi.LUPI_Relevance_CVXProblem
- fri.model.regression.Regression_Relevance_Bound
- fri.model.base_cvxproblem.Relevance_CVXProblem
- abc.ABC
Static methods
aggregate_max_candidates
def aggregate_max_candidates(
max_problems_candidates
)
View Source
@classmethod
def aggregate_max_candidates(cls, max_problems_candidates):
vals = [candidate.solved_relevance for candidate in max_problems_candidates]
max_value = max(vals)
return max_value
aggregate_min_candidates
def aggregate_min_candidates(
min_problems_candidates
)
View Source
@classmethod
def aggregate_min_candidates(cls, min_problems_candidates):
vals = [candidate.solved_relevance for candidate in min_problems_candidates]
min_value = min(vals)
return min_value
generate_lower_bound_problem
def generate_lower_bound_problem(
best_hyperparameters,
init_constraints,
best_model_state,
data,
di,
preset_model,
probeID=-1
)
View Source
@classmethod
def generate_lower_bound_problem(
cls,
best_hyperparameters,
init_constraints,
best_model_state,
data,
di,
preset_model,
probeID=-1,
):
problem = cls(
di,
data,
best_hyperparameters,
init_constraints,
preset_model=preset_model,
best_model_state=best_model_state,
probeID=probeID,
)
problem.init_objective_LB()
problem.isLowerBound = True
yield problem
generate_upper_bound_problem
def generate_upper_bound_problem(
best_hyperparameters,
init_constraints,
best_model_state,
data,
di,
preset_model,
probeID=-1
)
View Source
@classmethod
def generate_upper_bound_problem(
cls,
best_hyperparameters,
init_constraints,
best_model_state,
data,
di,
preset_model,
probeID=-1,
):
is_priv = is_lupi_feature(
di, data, best_model_state
) # Is it a lupi feature where we need additional candidate problems?
if not is_priv:
yield from super().generate_upper_bound_problem(
best_hyperparameters,
init_constraints,
best_model_state,
data,
di,
preset_model,
probeID=probeID,
)
else:
for sign, pos in product([1, -1], [True, False]):
problem = cls(
di,
data,
best_hyperparameters,
init_constraints,
preset_model=preset_model,
best_model_state=best_model_state,
probeID=probeID,
)
problem.init_objective_UB(sign=sign, pos=pos)
yield problem
Instance variables
accepted_status
constraints
cvx_problem
isProbe
is_solved
objective
probeID
solved_relevance
solver_kwargs
Methods
add_constraint
def add_constraint(
self,
new
)
View Source
def add_constraint(self, new):
self._constraints.append(new)
init_objective_LB
def init_objective_LB(
self,
**kwargs
)
View Source
def init_objective_LB(self, **kwargs):
# We have two models basically with different indexes
if self.isPriv:
self._init_objective_LB_LUPI(**kwargs)
else:
# We call sibling class of our lupi class, which is the normal problem
super().init_objective_LB(**kwargs)
init_objective_UB
def init_objective_UB(
self,
**kwargs
)
View Source
def init_objective_UB(self, **kwargs):
# We have two models basically with different indexes
if self.isPriv:
self._init_objective_UB_LUPI(**kwargs)
else:
# We call sibling class of our lupi class, which is the normal problem
super().init_objective_UB(**kwargs)
preprocessing_data
def preprocessing_data(
self,
data,
best_model_state
)
View Source
def preprocessing_data(self, data, best_model_state):
lupi_features = best_model_state["lupi_features"]
X_combined, y = data
X, X_priv = split_dataset(X_combined, lupi_features)
self.X_priv = X_priv
super().preprocessing_data((X, y), best_model_state)
assert lupi_features == X_priv.shape[1]
self.d_priv = lupi_features
# LUPI model, we need to offset the index
self.lupi_index = self.current_feature - self.d
if self.lupi_index >= 0:
self.isPriv = True
else:
self.isPriv = False
solve
def solve(
self
) -> object
View Source
def solve(self) -> object:
# We init cvx problem here because pickling LP solver objects is problematic
# by deferring it to here, worker threads do the problem building themselves and we spare the serialization
self._cvx_problem = cvx.Problem(
objective=self.objective, constraints=self.constraints
)
try:
# print("Solve", self)
self._cvx_problem.solve(**self.solver_kwargs)
except SolverError:
# We ignore Solver Errors, which are common with our framework:
# We solve multiple problems per bound and choose a feasible solution later (see '_create_interval')
pass
self._solver_status = self._cvx_problem.status
# self._cvx_problem = None
return self
LUPI_Regression_SVM
class LUPI_Regression_SVM(
C=1,
epsilon=0.1,
scaling_lupi_w=1,
scaling_lupi_loss=1,
lupi_features=None
)
Helper class that provides a standard way to create an ABC using inheritance.
View Source
class LUPI_Regression_SVM(LUPI_InitModel):
HYPERPARAMETER = ["C", "epsilon", "scaling_lupi_w", "scaling_lupi_loss"]
def __init__(
self,
C=1,
epsilon=0.1,
scaling_lupi_w=1,
scaling_lupi_loss=1,
lupi_features=None,
):
super().__init__()
self.epsilon = epsilon
self.scaling_lupi_loss = scaling_lupi_loss
self.scaling_lupi_w = scaling_lupi_w
self.C = C
self.lupi_features = lupi_features
def fit(self, X_combined, y, lupi_features=None):
"""
Parameters
----------
lupi_features : int
Number of features in dataset which are considered privileged information (PI).
PI features are expected to be the last features in the dataset.
"""
if lupi_features is None:
try:
lupi_features = self.lupi_features
self.lupi_features = lupi_features
except:
raise ValueError("No amount of lupi features given.")
X, X_priv = split_dataset(X_combined, self.lupi_features)
(n, d) = X.shape
# Get parameters from CV model without any feature contstraints
C = self.get_params()["C"]
epsilon = self.get_params()["epsilon"]
scaling_lupi_w = self.get_params()["scaling_lupi_w"]
scaling_lupi_loss = self.get_params()["scaling_lupi_loss"]
# Initalize Variables in cvxpy
w = cvx.Variable(shape=(d), name="w")
b = cvx.Variable(name="bias")
w_priv_pos = cvx.Variable(lupi_features, name="w_priv_pos")
b_priv_pos = cvx.Variable(name="bias_priv_pos")
w_priv_neg = cvx.Variable(lupi_features, name="w_priv_neg")
b_priv_neg = cvx.Variable(name="bias_priv_neg")
slack = cvx.Variable(shape=(n), name="slack")
# Define functions for better readability
priv_function_pos = X_priv @ w_priv_pos + b_priv_pos
priv_function_neg = X_priv @ w_priv_neg + b_priv_neg
# Combined loss of lupi function and normal slacks, scaled by two constants
priv_loss_pos = cvx.sum(priv_function_pos)
priv_loss_neg = cvx.sum(priv_function_neg)
priv_loss = priv_loss_pos + priv_loss_neg
slack_loss = cvx.sum(slack)
loss = scaling_lupi_loss * priv_loss + slack_loss
# L1 norm regularization of both functions with 1 scaling constant
weight_regularization = 0.5 * (
cvx.norm(w, 1)
+ scaling_lupi_w
* (0.5 * cvx.norm(w_priv_pos, 1) + 0.5 * cvx.norm(w_priv_neg, 1))
)
constraints = [
y - X @ w - b <= epsilon + priv_function_pos + slack,
X @ w + b - y <= epsilon + priv_function_neg + slack,
priv_function_pos >= 0,
priv_function_neg >= 0,
# priv_loss_pos >= 0,
# priv_loss_neg >= 0,
# slack_loss >= 0,
slack >= 0,
# loss >= 0,
]
objective = cvx.Minimize(C * loss + weight_regularization)
# Solve problem.
problem = cvx.Problem(objective, constraints)
problem.solve(**self.SOLVER_PARAMS)
self.model_state = {
"signs_pos": priv_function_pos.value > 0,
"signs_neg": priv_function_neg.value > 0,
"w": w.value,
"w_priv_pos": w_priv_pos.value,
"w_priv_neg": w_priv_neg.value,
"b": b.value,
"b_priv_pos": b_priv_pos.value,
"b_priv_neg": b_priv_neg.value,
"lupi_features": lupi_features, # Number of lupi features in the dataset TODO: Move this somewhere else,
}
w_l1 = np.linalg.norm(w.value, ord=1)
w_priv_pos_l1 = np.linalg.norm(w_priv_pos.value, ord=1)
w_priv_neg_l1 = np.linalg.norm(w_priv_neg.value, ord=1)
# We take the mean to combine all submodels (for priv) into a single normalization factor
w_priv_l1 = w_priv_pos_l1 + w_priv_neg_l1
self.constraints = {
"priv_loss": priv_loss.value,
"scaling_lupi_loss": scaling_lupi_loss,
# "loss_slack": slack_loss.value,
"loss": loss.value,
"w_l1": w_l1,
"w_priv_l1": w_priv_l1,
"w_priv_pos_l1": w_priv_pos_l1,
"w_priv_neg_l1": w_priv_neg_l1,
}
return self
@property
def SOLVER_PARAMS(cls):
return {"solver": "ECOS", "verbose": False}
def predict(self, X):
"""
Method to predict points using svm classification rule.
We use both normal and priv. features.
This function is mainly used for CV purposes to find the best parameters according to score.
Parameters
----------
X : numpy.ndarray
"""
X, X_priv = split_dataset(X, self.lupi_features)
w = self.model_state["w"]
b = self.model_state["b"]
y = np.dot(X, w) + b
return y
def score(self, X, y, **kwargs):
prediction = self.predict(X)
score = r2_score(y, prediction)
return score
Ancestors (in MRO)
- fri.model.base_initmodel.LUPI_InitModel
- fri.model.base_initmodel.InitModel
- abc.ABC
- sklearn.base.BaseEstimator
Class variables
HYPERPARAMETER
Instance variables
L1_factor
L1_factor_priv
SOLVER_PARAMS
Methods
fit
def fit(
self,
X_combined,
y,
lupi_features=None
)
Parameters
lupi_features : int Number of features in dataset which are considered privileged information (PI). PI features are expected to be the last features in the dataset.
View Source
def fit(self, X_combined, y, lupi_features=None):
"""
Parameters
----------
lupi_features : int
Number of features in dataset which are considered privileged information (PI).
PI features are expected to be the last features in the dataset.
"""
if lupi_features is None:
try:
lupi_features = self.lupi_features
self.lupi_features = lupi_features
except:
raise ValueError("No amount of lupi features given.")
X, X_priv = split_dataset(X_combined, self.lupi_features)
(n, d) = X.shape
# Get parameters from CV model without any feature contstraints
C = self.get_params()["C"]
epsilon = self.get_params()["epsilon"]
scaling_lupi_w = self.get_params()["scaling_lupi_w"]
scaling_lupi_loss = self.get_params()["scaling_lupi_loss"]
# Initalize Variables in cvxpy
w = cvx.Variable(shape=(d), name="w")
b = cvx.Variable(name="bias")
w_priv_pos = cvx.Variable(lupi_features, name="w_priv_pos")
b_priv_pos = cvx.Variable(name="bias_priv_pos")
w_priv_neg = cvx.Variable(lupi_features, name="w_priv_neg")
b_priv_neg = cvx.Variable(name="bias_priv_neg")
slack = cvx.Variable(shape=(n), name="slack")
# Define functions for better readability
priv_function_pos = X_priv @ w_priv_pos + b_priv_pos
priv_function_neg = X_priv @ w_priv_neg + b_priv_neg
# Combined loss of lupi function and normal slacks, scaled by two constants
priv_loss_pos = cvx.sum(priv_function_pos)
priv_loss_neg = cvx.sum(priv_function_neg)
priv_loss = priv_loss_pos + priv_loss_neg
slack_loss = cvx.sum(slack)
loss = scaling_lupi_loss * priv_loss + slack_loss
# L1 norm regularization of both functions with 1 scaling constant
weight_regularization = 0.5 * (
cvx.norm(w, 1)
+ scaling_lupi_w
* (0.5 * cvx.norm(w_priv_pos, 1) + 0.5 * cvx.norm(w_priv_neg, 1))
)
constraints = [
y - X @ w - b <= epsilon + priv_function_pos + slack,
X @ w + b - y <= epsilon + priv_function_neg + slack,
priv_function_pos >= 0,
priv_function_neg >= 0,
# priv_loss_pos >= 0,
# priv_loss_neg >= 0,
# slack_loss >= 0,
slack >= 0,
# loss >= 0,
]
objective = cvx.Minimize(C * loss + weight_regularization)
# Solve problem.
problem = cvx.Problem(objective, constraints)
problem.solve(**self.SOLVER_PARAMS)
self.model_state = {
"signs_pos": priv_function_pos.value > 0,
"signs_neg": priv_function_neg.value > 0,
"w": w.value,
"w_priv_pos": w_priv_pos.value,
"w_priv_neg": w_priv_neg.value,
"b": b.value,
"b_priv_pos": b_priv_pos.value,
"b_priv_neg": b_priv_neg.value,
"lupi_features": lupi_features, # Number of lupi features in the dataset TODO: Move this somewhere else,
}
w_l1 = np.linalg.norm(w.value, ord=1)
w_priv_pos_l1 = np.linalg.norm(w_priv_pos.value, ord=1)
w_priv_neg_l1 = np.linalg.norm(w_priv_neg.value, ord=1)
# We take the mean to combine all submodels (for priv) into a single normalization factor
w_priv_l1 = w_priv_pos_l1 + w_priv_neg_l1
self.constraints = {
"priv_loss": priv_loss.value,
"scaling_lupi_loss": scaling_lupi_loss,
# "loss_slack": slack_loss.value,
"loss": loss.value,
"w_l1": w_l1,
"w_priv_l1": w_priv_l1,
"w_priv_pos_l1": w_priv_pos_l1,
"w_priv_neg_l1": w_priv_neg_l1,
}
return self
get_params
def get_params(
self,
deep=True
)
Get parameters for this estimator.
Parameters
deep : bool, default=True If True, will return the parameters for this estimator and contained subobjects that are estimators.
Returns
params : dict Parameter names mapped to their values.
View Source
def get_params(self, deep=True):
"""
Get parameters for this estimator.
Parameters
----------
deep : bool, default=True
If True, will return the parameters for this estimator and
contained subobjects that are estimators.
Returns
-------
params : dict
Parameter names mapped to their values.
"""
out = dict()
for key in self._get_param_names():
value = getattr(self, key)
if deep and hasattr(value, 'get_params'):
deep_items = value.get_params().items()
out.update((key + '__' + k, val) for k, val in deep_items)
out[key] = value
return out
make_scorer
def make_scorer(
self
)
View Source
def make_scorer(self):
return None, None
predict
def predict(
self,
X
)
Method to predict points using svm classification rule. We use both normal and priv. features. This function is mainly used for CV purposes to find the best parameters according to score.
Parameters
X : numpy.ndarray
View Source
def predict(self, X):
"""
Method to predict points using svm classification rule.
We use both normal and priv. features.
This function is mainly used for CV purposes to find the best parameters according to score.
Parameters
----------
X : numpy.ndarray
"""
X, X_priv = split_dataset(X, self.lupi_features)
w = self.model_state["w"]
b = self.model_state["b"]
y = np.dot(X, w) + b
return y
score
def score(
self,
X,
y,
**kwargs
)
View Source
def score(self, X, y, **kwargs):
prediction = self.predict(X)
score = r2_score(y, prediction)
return score
set_params
def set_params(
self,
**params
)
Set the parameters of this estimator.
The method works on simple estimators as well as on nested objects
(such as :class:~sklearn.pipeline.Pipeline
). The latter have
parameters of the form <component>__<parameter>
so that it's
possible to update each component of a nested object.
Parameters
**params : dict Estimator parameters.
Returns
self : estimator instance Estimator instance.
View Source
def set_params(self, **params):
"""
Set the parameters of this estimator.
The method works on simple estimators as well as on nested objects
(such as :class:`~sklearn.pipeline.Pipeline`). The latter have
parameters of the form ``<component>__<parameter>`` so that it's
possible to update each component of a nested object.
Parameters
----------
**params : dict
Estimator parameters.
Returns
-------
self : estimator instance
Estimator instance.
"""
if not params:
# Simple optimization to gain speed (inspect is slow)
return self
valid_params = self.get_params(deep=True)
nested_params = defaultdict(dict) # grouped by prefix
for key, value in params.items():
key, delim, sub_key = key.partition('__')
if key not in valid_params:
raise ValueError('Invalid parameter %s for estimator %s. '
'Check the list of available parameters '
'with `estimator.get_params().keys()`.' %
(key, self))
if delim:
nested_params[key][sub_key] = value
else:
setattr(self, key, value)
valid_params[key] = value
for key, sub_params in nested_params.items():
valid_params[key].set_params(**sub_params)
return self