Module fri.model.base_lupi

View Source
from abc import abstractmethod

from .base_cvxproblem import Relevance_CVXProblem

class LUPI_Relevance_CVXProblem(Relevance_CVXProblem):

    def __init__(

        self,

        current_feature: int,

        data: tuple,

        hyperparameters,

        best_model_constraints,

        preset_model=None,

        best_model_state=None,

        probeID=-1,

    ) -> None:

        super().__init__(

            current_feature,

            data,

            hyperparameters,

            best_model_constraints,

            preset_model,

            best_model_state,

            probeID,

        )

    def preprocessing_data(self, data, best_model_state):

        lupi_features = best_model_state["lupi_features"]

        X_combined, y = data

        X, X_priv = split_dataset(X_combined, lupi_features)

        self.X_priv = X_priv

        super().preprocessing_data((X, y), best_model_state)

        assert lupi_features == X_priv.shape[1]

        self.d_priv = lupi_features

        # LUPI model, we need to offset the index

        self.lupi_index = self.current_feature - self.d

        if self.lupi_index >= 0:

            self.isPriv = True

        else:

            self.isPriv = False

    def init_objective_UB(self, **kwargs):

        # We have two models basically with different indexes

        if self.isPriv:

            self._init_objective_UB_LUPI(**kwargs)

        else:

            # We call sibling class of our lupi class, which is the normal problem

            super().init_objective_UB(**kwargs)

    def init_objective_LB(self, **kwargs):

        # We have two models basically with different indexes

        if self.isPriv:

            self._init_objective_LB_LUPI(**kwargs)

        else:

            # We call sibling class of our lupi class, which is the normal problem

            super().init_objective_LB(**kwargs)

    @abstractmethod

    def _init_objective_LB_LUPI(self, **kwargs):

        pass

    @abstractmethod

    def _init_objective_UB_LUPI(self, **kwargs):

        pass

def split_dataset(X_combined, lupi_features):

    assert X_combined.shape[1] > lupi_features

    X = X_combined[:, :-lupi_features]

    X_priv = X_combined[:, -lupi_features:]

    return X, X_priv

def is_lupi_feature(di, data, best_model_state):

    lupi_features = best_model_state["lupi_features"]

    X_combined, _ = data

    d = X_combined.shape[1] - lupi_features

    lupi_index = di - d

    return lupi_index >= 0

Functions

is_lupi_feature

def is_lupi_feature(
    di,
    data,
    best_model_state
)
View Source
def is_lupi_feature(di, data, best_model_state):

    lupi_features = best_model_state["lupi_features"]

    X_combined, _ = data

    d = X_combined.shape[1] - lupi_features

    lupi_index = di - d

    return lupi_index >= 0

split_dataset

def split_dataset(
    X_combined,
    lupi_features
)
View Source
def split_dataset(X_combined, lupi_features):

    assert X_combined.shape[1] > lupi_features

    X = X_combined[:, :-lupi_features]

    X_priv = X_combined[:, -lupi_features:]

    return X, X_priv

Classes

LUPI_Relevance_CVXProblem

class LUPI_Relevance_CVXProblem(
    current_feature: int,
    data: tuple,
    hyperparameters,
    best_model_constraints,
    preset_model=None,
    best_model_state=None,
    probeID=-1
)

Helper class that provides a standard way to create an ABC using inheritance.

View Source
class LUPI_Relevance_CVXProblem(Relevance_CVXProblem):

    def __init__(

        self,

        current_feature: int,

        data: tuple,

        hyperparameters,

        best_model_constraints,

        preset_model=None,

        best_model_state=None,

        probeID=-1,

    ) -> None:

        super().__init__(

            current_feature,

            data,

            hyperparameters,

            best_model_constraints,

            preset_model,

            best_model_state,

            probeID,

        )

    def preprocessing_data(self, data, best_model_state):

        lupi_features = best_model_state["lupi_features"]

        X_combined, y = data

        X, X_priv = split_dataset(X_combined, lupi_features)

        self.X_priv = X_priv

        super().preprocessing_data((X, y), best_model_state)

        assert lupi_features == X_priv.shape[1]

        self.d_priv = lupi_features

        # LUPI model, we need to offset the index

        self.lupi_index = self.current_feature - self.d

        if self.lupi_index >= 0:

            self.isPriv = True

        else:

            self.isPriv = False

    def init_objective_UB(self, **kwargs):

        # We have two models basically with different indexes

        if self.isPriv:

            self._init_objective_UB_LUPI(**kwargs)

        else:

            # We call sibling class of our lupi class, which is the normal problem

            super().init_objective_UB(**kwargs)

    def init_objective_LB(self, **kwargs):

        # We have two models basically with different indexes

        if self.isPriv:

            self._init_objective_LB_LUPI(**kwargs)

        else:

            # We call sibling class of our lupi class, which is the normal problem

            super().init_objective_LB(**kwargs)

    @abstractmethod

    def _init_objective_LB_LUPI(self, **kwargs):

        pass

    @abstractmethod

    def _init_objective_UB_LUPI(self, **kwargs):

        pass

Ancestors (in MRO)

  • fri.model.base_cvxproblem.Relevance_CVXProblem
  • abc.ABC

Descendants

  • fri.model.lupi_classification.LUPI_Classification_Relevance_Bound
  • fri.model.lupi_ordinal_regression.LUPI_OrdinalRegression_Relevance_Bound
  • fri.model.lupi_regression.LUPI_Regression_Relevance_Bound

Static methods

aggregate_max_candidates
def aggregate_max_candidates(
    max_problems_candidates
)
View Source
    @classmethod

    def aggregate_max_candidates(cls, max_problems_candidates):

        vals = [candidate.solved_relevance for candidate in max_problems_candidates]

        max_value = max(vals)

        return max_value
aggregate_min_candidates
def aggregate_min_candidates(
    min_problems_candidates
)
View Source
    @classmethod

    def aggregate_min_candidates(cls, min_problems_candidates):

        vals = [candidate.solved_relevance for candidate in min_problems_candidates]

        min_value = min(vals)

        return min_value
generate_lower_bound_problem
def generate_lower_bound_problem(
    best_hyperparameters,
    init_constraints,
    best_model_state,
    data,
    di,
    preset_model,
    probeID=-1
)
View Source
    @classmethod

    def generate_lower_bound_problem(

        cls,

        best_hyperparameters,

        init_constraints,

        best_model_state,

        data,

        di,

        preset_model,

        probeID=-1,

    ):

        problem = cls(

            di,

            data,

            best_hyperparameters,

            init_constraints,

            preset_model=preset_model,

            best_model_state=best_model_state,

            probeID=probeID,

        )

        problem.init_objective_LB()

        problem.isLowerBound = True

        yield problem
generate_upper_bound_problem
def generate_upper_bound_problem(
    best_hyperparameters,
    init_constraints,
    best_model_state,
    data,
    di,
    preset_model,
    probeID=-1
)
View Source
    @classmethod

    def generate_upper_bound_problem(

        cls,

        best_hyperparameters,

        init_constraints,

        best_model_state,

        data,

        di,

        preset_model,

        probeID=-1,

    ):

        for sign in [-1, 1]:

            problem = cls(

                di,

                data,

                best_hyperparameters,

                init_constraints,

                preset_model=preset_model,

                best_model_state=best_model_state,

                probeID=probeID,

            )

            problem.init_objective_UB(sign=sign)

            problem.isLowerBound = False

            yield problem

Instance variables

accepted_status
constraints
cvx_problem
isProbe
is_solved
objective
probeID
solved_relevance
solver_kwargs

Methods

add_constraint
def add_constraint(
    self,
    new
)
View Source
    def add_constraint(self, new):

        self._constraints.append(new)
init_objective_LB
def init_objective_LB(
    self,
    **kwargs
)
View Source
    def init_objective_LB(self, **kwargs):

        # We have two models basically with different indexes

        if self.isPriv:

            self._init_objective_LB_LUPI(**kwargs)

        else:

            # We call sibling class of our lupi class, which is the normal problem

            super().init_objective_LB(**kwargs)
init_objective_UB
def init_objective_UB(
    self,
    **kwargs
)
View Source
    def init_objective_UB(self, **kwargs):

        # We have two models basically with different indexes

        if self.isPriv:

            self._init_objective_UB_LUPI(**kwargs)

        else:

            # We call sibling class of our lupi class, which is the normal problem

            super().init_objective_UB(**kwargs)
preprocessing_data
def preprocessing_data(
    self,
    data,
    best_model_state
)
View Source
    def preprocessing_data(self, data, best_model_state):

        lupi_features = best_model_state["lupi_features"]

        X_combined, y = data

        X, X_priv = split_dataset(X_combined, lupi_features)

        self.X_priv = X_priv

        super().preprocessing_data((X, y), best_model_state)

        assert lupi_features == X_priv.shape[1]

        self.d_priv = lupi_features

        # LUPI model, we need to offset the index

        self.lupi_index = self.current_feature - self.d

        if self.lupi_index >= 0:

            self.isPriv = True

        else:

            self.isPriv = False
solve
def solve(
    self
) -> object
View Source
    def solve(self) -> object:

        # We init cvx problem here because pickling LP solver objects is problematic

        # by deferring it to here, worker threads do the problem building themselves and we spare the serialization

        self._cvx_problem = cvx.Problem(

            objective=self.objective, constraints=self.constraints

        )

        try:

            # print("Solve", self)

            self._cvx_problem.solve(**self.solver_kwargs)

        except SolverError:

            # We ignore Solver Errors, which are common with our framework:

            # We solve multiple problems per bound and choose a feasible solution later (see '_create_interval')

            pass

        self._solver_status = self._cvx_problem.status

        # self._cvx_problem = None

        return self