Module fri.model.base_lupi
View Source
from abc import abstractmethod
from .base_cvxproblem import Relevance_CVXProblem
class LUPI_Relevance_CVXProblem(Relevance_CVXProblem):
def __init__(
self,
current_feature: int,
data: tuple,
hyperparameters,
best_model_constraints,
preset_model=None,
best_model_state=None,
probeID=-1,
) -> None:
super().__init__(
current_feature,
data,
hyperparameters,
best_model_constraints,
preset_model,
best_model_state,
probeID,
)
def preprocessing_data(self, data, best_model_state):
lupi_features = best_model_state["lupi_features"]
X_combined, y = data
X, X_priv = split_dataset(X_combined, lupi_features)
self.X_priv = X_priv
super().preprocessing_data((X, y), best_model_state)
assert lupi_features == X_priv.shape[1]
self.d_priv = lupi_features
# LUPI model, we need to offset the index
self.lupi_index = self.current_feature - self.d
if self.lupi_index >= 0:
self.isPriv = True
else:
self.isPriv = False
def init_objective_UB(self, **kwargs):
# We have two models basically with different indexes
if self.isPriv:
self._init_objective_UB_LUPI(**kwargs)
else:
# We call sibling class of our lupi class, which is the normal problem
super().init_objective_UB(**kwargs)
def init_objective_LB(self, **kwargs):
# We have two models basically with different indexes
if self.isPriv:
self._init_objective_LB_LUPI(**kwargs)
else:
# We call sibling class of our lupi class, which is the normal problem
super().init_objective_LB(**kwargs)
@abstractmethod
def _init_objective_LB_LUPI(self, **kwargs):
pass
@abstractmethod
def _init_objective_UB_LUPI(self, **kwargs):
pass
def split_dataset(X_combined, lupi_features):
assert X_combined.shape[1] > lupi_features
X = X_combined[:, :-lupi_features]
X_priv = X_combined[:, -lupi_features:]
return X, X_priv
def is_lupi_feature(di, data, best_model_state):
lupi_features = best_model_state["lupi_features"]
X_combined, _ = data
d = X_combined.shape[1] - lupi_features
lupi_index = di - d
return lupi_index >= 0
Functions
is_lupi_feature
def is_lupi_feature(
di,
data,
best_model_state
)
View Source
def is_lupi_feature(di, data, best_model_state):
lupi_features = best_model_state["lupi_features"]
X_combined, _ = data
d = X_combined.shape[1] - lupi_features
lupi_index = di - d
return lupi_index >= 0
split_dataset
def split_dataset(
X_combined,
lupi_features
)
View Source
def split_dataset(X_combined, lupi_features):
assert X_combined.shape[1] > lupi_features
X = X_combined[:, :-lupi_features]
X_priv = X_combined[:, -lupi_features:]
return X, X_priv
Classes
LUPI_Relevance_CVXProblem
class LUPI_Relevance_CVXProblem(
current_feature: int,
data: tuple,
hyperparameters,
best_model_constraints,
preset_model=None,
best_model_state=None,
probeID=-1
)
Helper class that provides a standard way to create an ABC using inheritance.
View Source
class LUPI_Relevance_CVXProblem(Relevance_CVXProblem):
def __init__(
self,
current_feature: int,
data: tuple,
hyperparameters,
best_model_constraints,
preset_model=None,
best_model_state=None,
probeID=-1,
) -> None:
super().__init__(
current_feature,
data,
hyperparameters,
best_model_constraints,
preset_model,
best_model_state,
probeID,
)
def preprocessing_data(self, data, best_model_state):
lupi_features = best_model_state["lupi_features"]
X_combined, y = data
X, X_priv = split_dataset(X_combined, lupi_features)
self.X_priv = X_priv
super().preprocessing_data((X, y), best_model_state)
assert lupi_features == X_priv.shape[1]
self.d_priv = lupi_features
# LUPI model, we need to offset the index
self.lupi_index = self.current_feature - self.d
if self.lupi_index >= 0:
self.isPriv = True
else:
self.isPriv = False
def init_objective_UB(self, **kwargs):
# We have two models basically with different indexes
if self.isPriv:
self._init_objective_UB_LUPI(**kwargs)
else:
# We call sibling class of our lupi class, which is the normal problem
super().init_objective_UB(**kwargs)
def init_objective_LB(self, **kwargs):
# We have two models basically with different indexes
if self.isPriv:
self._init_objective_LB_LUPI(**kwargs)
else:
# We call sibling class of our lupi class, which is the normal problem
super().init_objective_LB(**kwargs)
@abstractmethod
def _init_objective_LB_LUPI(self, **kwargs):
pass
@abstractmethod
def _init_objective_UB_LUPI(self, **kwargs):
pass
Ancestors (in MRO)
- fri.model.base_cvxproblem.Relevance_CVXProblem
- abc.ABC
Descendants
- fri.model.lupi_classification.LUPI_Classification_Relevance_Bound
- fri.model.lupi_ordinal_regression.LUPI_OrdinalRegression_Relevance_Bound
- fri.model.lupi_regression.LUPI_Regression_Relevance_Bound
Static methods
aggregate_max_candidates
def aggregate_max_candidates(
max_problems_candidates
)
View Source
@classmethod
def aggregate_max_candidates(cls, max_problems_candidates):
vals = [candidate.solved_relevance for candidate in max_problems_candidates]
max_value = max(vals)
return max_value
aggregate_min_candidates
def aggregate_min_candidates(
min_problems_candidates
)
View Source
@classmethod
def aggregate_min_candidates(cls, min_problems_candidates):
vals = [candidate.solved_relevance for candidate in min_problems_candidates]
min_value = min(vals)
return min_value
generate_lower_bound_problem
def generate_lower_bound_problem(
best_hyperparameters,
init_constraints,
best_model_state,
data,
di,
preset_model,
probeID=-1
)
View Source
@classmethod
def generate_lower_bound_problem(
cls,
best_hyperparameters,
init_constraints,
best_model_state,
data,
di,
preset_model,
probeID=-1,
):
problem = cls(
di,
data,
best_hyperparameters,
init_constraints,
preset_model=preset_model,
best_model_state=best_model_state,
probeID=probeID,
)
problem.init_objective_LB()
problem.isLowerBound = True
yield problem
generate_upper_bound_problem
def generate_upper_bound_problem(
best_hyperparameters,
init_constraints,
best_model_state,
data,
di,
preset_model,
probeID=-1
)
View Source
@classmethod
def generate_upper_bound_problem(
cls,
best_hyperparameters,
init_constraints,
best_model_state,
data,
di,
preset_model,
probeID=-1,
):
for sign in [-1, 1]:
problem = cls(
di,
data,
best_hyperparameters,
init_constraints,
preset_model=preset_model,
best_model_state=best_model_state,
probeID=probeID,
)
problem.init_objective_UB(sign=sign)
problem.isLowerBound = False
yield problem
Instance variables
accepted_status
constraints
cvx_problem
isProbe
is_solved
objective
probeID
solved_relevance
solver_kwargs
Methods
add_constraint
def add_constraint(
self,
new
)
View Source
def add_constraint(self, new):
self._constraints.append(new)
init_objective_LB
def init_objective_LB(
self,
**kwargs
)
View Source
def init_objective_LB(self, **kwargs):
# We have two models basically with different indexes
if self.isPriv:
self._init_objective_LB_LUPI(**kwargs)
else:
# We call sibling class of our lupi class, which is the normal problem
super().init_objective_LB(**kwargs)
init_objective_UB
def init_objective_UB(
self,
**kwargs
)
View Source
def init_objective_UB(self, **kwargs):
# We have two models basically with different indexes
if self.isPriv:
self._init_objective_UB_LUPI(**kwargs)
else:
# We call sibling class of our lupi class, which is the normal problem
super().init_objective_UB(**kwargs)
preprocessing_data
def preprocessing_data(
self,
data,
best_model_state
)
View Source
def preprocessing_data(self, data, best_model_state):
lupi_features = best_model_state["lupi_features"]
X_combined, y = data
X, X_priv = split_dataset(X_combined, lupi_features)
self.X_priv = X_priv
super().preprocessing_data((X, y), best_model_state)
assert lupi_features == X_priv.shape[1]
self.d_priv = lupi_features
# LUPI model, we need to offset the index
self.lupi_index = self.current_feature - self.d
if self.lupi_index >= 0:
self.isPriv = True
else:
self.isPriv = False
solve
def solve(
self
) -> object
View Source
def solve(self) -> object:
# We init cvx problem here because pickling LP solver objects is problematic
# by deferring it to here, worker threads do the problem building themselves and we spare the serialization
self._cvx_problem = cvx.Problem(
objective=self.objective, constraints=self.constraints
)
try:
# print("Solve", self)
self._cvx_problem.solve(**self.solver_kwargs)
except SolverError:
# We ignore Solver Errors, which are common with our framework:
# We solve multiple problems per bound and choose a feasible solution later (see '_create_interval')
pass
self._solver_status = self._cvx_problem.status
# self._cvx_problem = None
return self