Skip to content

Module fri.model.lupi_regression

View Source
from itertools import product

import cvxpy as cvx

import numpy as np

from sklearn.metrics import r2_score

from sklearn.utils import check_X_y

from fri.model.base_lupi import (

    LUPI_Relevance_CVXProblem,

    split_dataset,

    is_lupi_feature,

)

from fri.model.regression import Regression_Relevance_Bound

from .base_initmodel import LUPI_InitModel

from .base_type import ProblemType

class LUPI_Regression(ProblemType):

    def __init__(self, **kwargs):

        super().__init__(**kwargs)

        self._lupi_features = None

    @property

    def lupi_features(self):

        return self._lupi_features

    @classmethod

    def parameters(cls):

        return ["C", "epsilon", "scaling_lupi_w", "scaling_lupi_loss"]

    @property

    def get_initmodel_template(cls):

        return LUPI_Regression_SVM

    @property

    def get_cvxproblem_template(cls):

        return LUPI_Regression_Relevance_Bound

    def relax_factors(cls):

        return ["loss_slack", "w_l1_slack"]

    def preprocessing(self, data, lupi_features=None):

        X, y = data

        d = X.shape[1]

        if lupi_features is None:

            raise ValueError("Argument 'lupi_features' missing in fit() call.")

        if not isinstance(lupi_features, int):

            raise ValueError("Argument 'lupi_features' is not type int.")

        if not 0 < lupi_features < d:

            raise ValueError(

                "Argument 'lupi_features' looks wrong. We need at least 1 priviliged feature (>0) or at least one normal feature."

            )

        self._lupi_features = lupi_features

        # Check that X and y have correct shape

        X, y = check_X_y(X, y)

        return X, y

class LUPI_Regression_SVM(LUPI_InitModel):

    HYPERPARAMETER = ["C", "epsilon", "scaling_lupi_w", "scaling_lupi_loss"]

    def __init__(

        self,

        C=1,

        epsilon=0.1,

        scaling_lupi_w=1,

        scaling_lupi_loss=1,

        lupi_features=None,

    ):

        super().__init__()

        self.epsilon = epsilon

        self.scaling_lupi_loss = scaling_lupi_loss

        self.scaling_lupi_w = scaling_lupi_w

        self.C = C

        self.lupi_features = lupi_features

    def fit(self, X_combined, y, lupi_features=None):

        """

        Parameters

        ----------

        lupi_features : int

            Number of features in dataset which are considered privileged information (PI).

            PI features are expected to be the last features in the dataset.

        """

        if lupi_features is None:

            try:

                lupi_features = self.lupi_features

                self.lupi_features = lupi_features

            except:

                raise ValueError("No amount of lupi features given.")

        X, X_priv = split_dataset(X_combined, self.lupi_features)

        (n, d) = X.shape

        # Get parameters from CV model without any feature contstraints

        C = self.get_params()["C"]

        epsilon = self.get_params()["epsilon"]

        scaling_lupi_w = self.get_params()["scaling_lupi_w"]

        scaling_lupi_loss = self.get_params()["scaling_lupi_loss"]

        # Initalize Variables in cvxpy

        w = cvx.Variable(shape=(d), name="w")

        b = cvx.Variable(name="bias")

        w_priv_pos = cvx.Variable(lupi_features, name="w_priv_pos")

        b_priv_pos = cvx.Variable(name="bias_priv_pos")

        w_priv_neg = cvx.Variable(lupi_features, name="w_priv_neg")

        b_priv_neg = cvx.Variable(name="bias_priv_neg")

        slack = cvx.Variable(shape=(n), name="slack")

        # Define functions for better readability

        priv_function_pos = X_priv @ w_priv_pos + b_priv_pos

        priv_function_neg = X_priv @ w_priv_neg + b_priv_neg

        # Combined loss of lupi function and normal slacks, scaled by two constants

        priv_loss_pos = cvx.sum(priv_function_pos)

        priv_loss_neg = cvx.sum(priv_function_neg)

        priv_loss = priv_loss_pos + priv_loss_neg

        slack_loss = cvx.sum(slack)

        loss = scaling_lupi_loss * priv_loss + slack_loss

        # L1 norm regularization of both functions with 1 scaling constant

        weight_regularization = 0.5 * (

            cvx.norm(w, 1)

            + scaling_lupi_w

            * (0.5 * cvx.norm(w_priv_pos, 1) + 0.5 * cvx.norm(w_priv_neg, 1))

        )

        constraints = [

            y - X @ w - b <= epsilon + priv_function_pos + slack,

            X @ w + b - y <= epsilon + priv_function_neg + slack,

            priv_function_pos >= 0,

            priv_function_neg >= 0,

            # priv_loss_pos >= 0,

            # priv_loss_neg >= 0,

            # slack_loss >= 0,

            slack >= 0,

            # loss >= 0,

        ]

        objective = cvx.Minimize(C * loss + weight_regularization)

        # Solve problem.

        problem = cvx.Problem(objective, constraints)

        problem.solve(**self.SOLVER_PARAMS)

        self.model_state = {

            "signs_pos": priv_function_pos.value > 0,

            "signs_neg": priv_function_neg.value > 0,

            "w": w.value,

            "w_priv_pos": w_priv_pos.value,

            "w_priv_neg": w_priv_neg.value,

            "b": b.value,

            "b_priv_pos": b_priv_pos.value,

            "b_priv_neg": b_priv_neg.value,

            "lupi_features": lupi_features,  # Number of lupi features in the dataset TODO: Move this somewhere else,

        }

        w_l1 = np.linalg.norm(w.value, ord=1)

        w_priv_pos_l1 = np.linalg.norm(w_priv_pos.value, ord=1)

        w_priv_neg_l1 = np.linalg.norm(w_priv_neg.value, ord=1)

        # We take the mean to combine all submodels (for priv) into a single normalization factor

        w_priv_l1 = w_priv_pos_l1 + w_priv_neg_l1

        self.constraints = {

            "priv_loss": priv_loss.value,

            "scaling_lupi_loss": scaling_lupi_loss,

            # "loss_slack": slack_loss.value,

            "loss": loss.value,

            "w_l1": w_l1,

            "w_priv_l1": w_priv_l1,

            "w_priv_pos_l1": w_priv_pos_l1,

            "w_priv_neg_l1": w_priv_neg_l1,

        }

        return self

    @property

    def SOLVER_PARAMS(cls):

        return {"solver": "ECOS", "verbose": False}

    def predict(self, X):

        """

        Method to predict points using svm classification rule.

        We use both normal and priv. features.

        This function is mainly used for CV purposes to find the best parameters according to score.

        Parameters

        ----------

        X : numpy.ndarray

        """

        X, X_priv = split_dataset(X, self.lupi_features)

        w = self.model_state["w"]

        b = self.model_state["b"]

        y = np.dot(X, w) + b

        return y

    def score(self, X, y, **kwargs):

        prediction = self.predict(X)

        score = r2_score(y, prediction)

        return score

class LUPI_Regression_Relevance_Bound(

    LUPI_Relevance_CVXProblem, Regression_Relevance_Bound

):

    @classmethod

    def generate_upper_bound_problem(

        cls,

        best_hyperparameters,

        init_constraints,

        best_model_state,

        data,

        di,

        preset_model,

        probeID=-1,

    ):

        is_priv = is_lupi_feature(

            di, data, best_model_state

        )  # Is it a lupi feature where we need additional candidate problems?

        if not is_priv:

            yield from super().generate_upper_bound_problem(

                best_hyperparameters,

                init_constraints,

                best_model_state,

                data,

                di,

                preset_model,

                probeID=probeID,

            )

        else:

            for sign, pos in product([1, -1], [True, False]):

                problem = cls(

                    di,

                    data,

                    best_hyperparameters,

                    init_constraints,

                    preset_model=preset_model,

                    best_model_state=best_model_state,

                    probeID=probeID,

                )

                problem.init_objective_UB(sign=sign, pos=pos)

                yield problem

    def _init_objective_LB_LUPI(self, **kwargs):

        self.add_constraint(

            cvx.abs(self.w_priv_pos[self.lupi_index]) <= self.feature_relevance

        )

        self.add_constraint(

            cvx.abs(self.w_priv_neg[self.lupi_index]) <= self.feature_relevance

        )

        self._objective = cvx.Minimize(self.feature_relevance)

    def _init_objective_UB_LUPI(self, pos=None, sign=None, **kwargs):

        if pos:

            self.add_constraint(

                self.feature_relevance <= sign * self.w_priv_pos[self.lupi_index]

            )

        else:

            self.add_constraint(

                self.feature_relevance <= sign * self.w_priv_neg[self.lupi_index]

            )

        self._objective = cvx.Maximize(self.feature_relevance)

    def _init_constraints(self, parameters, init_model_constraints):

        # Upper constraints from best initial model

        l1_w = init_model_constraints["w_l1"]

        self.l1_priv_w_pos = init_model_constraints["w_priv_pos_l1"]

        self.l1_priv_w_neg = init_model_constraints["w_priv_neg_l1"]

        init_loss = init_model_constraints["loss"]

        epsilon = parameters["epsilon"]

        scaling_lupi_loss = init_model_constraints["scaling_lupi_loss"]

        # New Variables

        w = cvx.Variable(shape=(self.d), name="w")

        b = cvx.Variable(name="b")

        w_priv_pos = cvx.Variable(self.d_priv, name="w_priv_pos")

        b_priv_pos = cvx.Variable(name="bias_priv_pos")

        w_priv_neg = cvx.Variable(self.d_priv, name="w_priv_neg")

        b_priv_neg = cvx.Variable(name="bias_priv_neg")

        slack = cvx.Variable(shape=(self.n))

        priv_function_pos = self.X_priv @ w_priv_pos + b_priv_pos

        priv_function_neg = self.X_priv @ w_priv_neg + b_priv_neg

        priv_loss = cvx.sum(priv_function_pos + priv_function_neg)

        loss = priv_loss + cvx.sum(slack)

        weight_norm = cvx.norm(w, 1)

        self.weight_norm_priv_pos = cvx.norm(w_priv_pos, 1)

        self.weight_norm_priv_neg = cvx.norm(w_priv_neg, 1)

        self.add_constraint(

            self.y - self.X @ w - b <= epsilon + priv_function_pos + slack

        )

        self.add_constraint(

            self.X @ w + b - self.y <= epsilon + priv_function_neg + slack

        )

        self.add_constraint(priv_function_pos >= 0)

        self.add_constraint(priv_function_neg >= 0)

        self.add_constraint(loss <= init_loss)

        self.add_constraint(slack >= 0)

        sum_norms = weight_norm + self.weight_norm_priv_pos + self.weight_norm_priv_neg

        self.add_constraint(sum_norms <= l1_w)

        # self.add_constraint(self.weight_norm_priv_pos <= self.l1_priv_w_pos)

        # self.add_constraint(self.weight_norm_priv_neg <= self.l1_priv_w_neg)

        # Save values for object use later

        self.w = w

        self.w_priv_pos = w_priv_pos

        self.w_priv_neg = w_priv_neg

        self.feature_relevance = cvx.Variable(nonneg=True, name="Feature Relevance")

Classes

LUPI_Regression

class LUPI_Regression(
    **kwargs
)

Helper class that provides a standard way to create an ABC using inheritance.

View Source
class LUPI_Regression(ProblemType):

    def __init__(self, **kwargs):

        super().__init__(**kwargs)

        self._lupi_features = None

    @property

    def lupi_features(self):

        return self._lupi_features

    @classmethod

    def parameters(cls):

        return ["C", "epsilon", "scaling_lupi_w", "scaling_lupi_loss"]

    @property

    def get_initmodel_template(cls):

        return LUPI_Regression_SVM

    @property

    def get_cvxproblem_template(cls):

        return LUPI_Regression_Relevance_Bound

    def relax_factors(cls):

        return ["loss_slack", "w_l1_slack"]

    def preprocessing(self, data, lupi_features=None):

        X, y = data

        d = X.shape[1]

        if lupi_features is None:

            raise ValueError("Argument 'lupi_features' missing in fit() call.")

        if not isinstance(lupi_features, int):

            raise ValueError("Argument 'lupi_features' is not type int.")

        if not 0 < lupi_features < d:

            raise ValueError(

                "Argument 'lupi_features' looks wrong. We need at least 1 priviliged feature (>0) or at least one normal feature."

            )

        self._lupi_features = lupi_features

        # Check that X and y have correct shape

        X, y = check_X_y(X, y)

        return X, y

Ancestors (in MRO)

  • fri.model.base_type.ProblemType
  • abc.ABC

Static methods

parameters
def parameters(

)
View Source
    @classmethod

    def parameters(cls):

        return ["C", "epsilon", "scaling_lupi_w", "scaling_lupi_loss"]

Instance variables

get_cvxproblem_template
get_initmodel_template
lupi_features

Methods

get_all_parameters
def get_all_parameters(
    self
)
View Source
    def get_all_parameters(self):

        return {p: self.get_chosen_parameter(p) for p in self.parameters()}
get_all_relax_factors
def get_all_relax_factors(
    self
)
View Source
    def get_all_relax_factors(self):

        return {p: self.get_chosen_relax_factors(p) for p in self.relax_factors()}
get_chosen_parameter
def get_chosen_parameter(
    self,
    p
)
View Source
    def get_chosen_parameter(self, p):

        try:

            return [

                self.chosen_parameters_[p]

            ]  # We return list for param search function

        except:

            # # TODO: rewrite the parameter logic

            # # TODO: move this to subclass

            if p == "scaling_lupi_w":

                # return [0.1, 1, 10, 100, 1000]

                return scipy.stats.reciprocal(a=1e-15, b=1e10)

            # if p == "scaling_lupi_loss":

            #    # value 0>p<1 causes standard svm solution

            #    # p>1 encourages usage of lupi function

            #    return scipy.stats.reciprocal(a=1e-15, b=1e15)

            if p == "C":

                return scipy.stats.reciprocal(a=1e-5, b=1e5)

            if p == "epsilon":

                return [0, 0.001, 0.01, 0.1, 1, 10, 100]

            else:

                return scipy.stats.reciprocal(a=1e-10, b=1e10)
get_chosen_relax_factors
def get_chosen_relax_factors(
    self,
    p
)
View Source
    def get_chosen_relax_factors(self, p):

        try:

            factor = self.relax_factors_[p]

        except KeyError:

            try:

                factor = self.relax_factors_[p + "_slack"]

            except KeyError:

                factor = 0.1

        if factor < 0:

            raise ValueError("Slack Factor multiplier is positive!")

        return factor
get_relaxed_constraints
def get_relaxed_constraints(
    self,
    constraints
)
View Source
    def get_relaxed_constraints(self, constraints):

        return {c: self.relax_constraint(c, v) for c, v in constraints.items()}
postprocessing
def postprocessing(
    self,
    bounds
)
View Source
    def postprocessing(self, bounds):

        return bounds
preprocessing
def preprocessing(
    self,
    data,
    lupi_features=None
)
View Source
    def preprocessing(self, data, lupi_features=None):

        X, y = data

        d = X.shape[1]

        if lupi_features is None:

            raise ValueError("Argument 'lupi_features' missing in fit() call.")

        if not isinstance(lupi_features, int):

            raise ValueError("Argument 'lupi_features' is not type int.")

        if not 0 < lupi_features < d:

            raise ValueError(

                "Argument 'lupi_features' looks wrong. We need at least 1 priviliged feature (>0) or at least one normal feature."

            )

        self._lupi_features = lupi_features

        # Check that X and y have correct shape

        X, y = check_X_y(X, y)

        return X, y
relax_constraint
def relax_constraint(
    self,
    key,
    value
)
View Source
    def relax_constraint(self, key, value):

        return value * (1 + self.get_chosen_relax_factors(key))
relax_factors
def relax_factors(
    cls
)
View Source
    def relax_factors(cls):

        return ["loss_slack", "w_l1_slack"]

LUPI_Regression_Relevance_Bound

class LUPI_Regression_Relevance_Bound(
    current_feature: int,
    data: tuple,
    hyperparameters,
    best_model_constraints,
    preset_model=None,
    best_model_state=None,
    probeID=-1
)

Helper class that provides a standard way to create an ABC using inheritance.

View Source
class LUPI_Regression_Relevance_Bound(

    LUPI_Relevance_CVXProblem, Regression_Relevance_Bound

):

    @classmethod

    def generate_upper_bound_problem(

        cls,

        best_hyperparameters,

        init_constraints,

        best_model_state,

        data,

        di,

        preset_model,

        probeID=-1,

    ):

        is_priv = is_lupi_feature(

            di, data, best_model_state

        )  # Is it a lupi feature where we need additional candidate problems?

        if not is_priv:

            yield from super().generate_upper_bound_problem(

                best_hyperparameters,

                init_constraints,

                best_model_state,

                data,

                di,

                preset_model,

                probeID=probeID,

            )

        else:

            for sign, pos in product([1, -1], [True, False]):

                problem = cls(

                    di,

                    data,

                    best_hyperparameters,

                    init_constraints,

                    preset_model=preset_model,

                    best_model_state=best_model_state,

                    probeID=probeID,

                )

                problem.init_objective_UB(sign=sign, pos=pos)

                yield problem

    def _init_objective_LB_LUPI(self, **kwargs):

        self.add_constraint(

            cvx.abs(self.w_priv_pos[self.lupi_index]) <= self.feature_relevance

        )

        self.add_constraint(

            cvx.abs(self.w_priv_neg[self.lupi_index]) <= self.feature_relevance

        )

        self._objective = cvx.Minimize(self.feature_relevance)

    def _init_objective_UB_LUPI(self, pos=None, sign=None, **kwargs):

        if pos:

            self.add_constraint(

                self.feature_relevance <= sign * self.w_priv_pos[self.lupi_index]

            )

        else:

            self.add_constraint(

                self.feature_relevance <= sign * self.w_priv_neg[self.lupi_index]

            )

        self._objective = cvx.Maximize(self.feature_relevance)

    def _init_constraints(self, parameters, init_model_constraints):

        # Upper constraints from best initial model

        l1_w = init_model_constraints["w_l1"]

        self.l1_priv_w_pos = init_model_constraints["w_priv_pos_l1"]

        self.l1_priv_w_neg = init_model_constraints["w_priv_neg_l1"]

        init_loss = init_model_constraints["loss"]

        epsilon = parameters["epsilon"]

        scaling_lupi_loss = init_model_constraints["scaling_lupi_loss"]

        # New Variables

        w = cvx.Variable(shape=(self.d), name="w")

        b = cvx.Variable(name="b")

        w_priv_pos = cvx.Variable(self.d_priv, name="w_priv_pos")

        b_priv_pos = cvx.Variable(name="bias_priv_pos")

        w_priv_neg = cvx.Variable(self.d_priv, name="w_priv_neg")

        b_priv_neg = cvx.Variable(name="bias_priv_neg")

        slack = cvx.Variable(shape=(self.n))

        priv_function_pos = self.X_priv @ w_priv_pos + b_priv_pos

        priv_function_neg = self.X_priv @ w_priv_neg + b_priv_neg

        priv_loss = cvx.sum(priv_function_pos + priv_function_neg)

        loss = priv_loss + cvx.sum(slack)

        weight_norm = cvx.norm(w, 1)

        self.weight_norm_priv_pos = cvx.norm(w_priv_pos, 1)

        self.weight_norm_priv_neg = cvx.norm(w_priv_neg, 1)

        self.add_constraint(

            self.y - self.X @ w - b <= epsilon + priv_function_pos + slack

        )

        self.add_constraint(

            self.X @ w + b - self.y <= epsilon + priv_function_neg + slack

        )

        self.add_constraint(priv_function_pos >= 0)

        self.add_constraint(priv_function_neg >= 0)

        self.add_constraint(loss <= init_loss)

        self.add_constraint(slack >= 0)

        sum_norms = weight_norm + self.weight_norm_priv_pos + self.weight_norm_priv_neg

        self.add_constraint(sum_norms <= l1_w)

        # self.add_constraint(self.weight_norm_priv_pos <= self.l1_priv_w_pos)

        # self.add_constraint(self.weight_norm_priv_neg <= self.l1_priv_w_neg)

        # Save values for object use later

        self.w = w

        self.w_priv_pos = w_priv_pos

        self.w_priv_neg = w_priv_neg

        self.feature_relevance = cvx.Variable(nonneg=True, name="Feature Relevance")

Ancestors (in MRO)

  • fri.model.base_lupi.LUPI_Relevance_CVXProblem
  • fri.model.regression.Regression_Relevance_Bound
  • fri.model.base_cvxproblem.Relevance_CVXProblem
  • abc.ABC

Static methods

aggregate_max_candidates
def aggregate_max_candidates(
    max_problems_candidates
)
View Source
    @classmethod

    def aggregate_max_candidates(cls, max_problems_candidates):

        vals = [candidate.solved_relevance for candidate in max_problems_candidates]

        max_value = max(vals)

        return max_value
aggregate_min_candidates
def aggregate_min_candidates(
    min_problems_candidates
)
View Source
    @classmethod

    def aggregate_min_candidates(cls, min_problems_candidates):

        vals = [candidate.solved_relevance for candidate in min_problems_candidates]

        min_value = min(vals)

        return min_value
generate_lower_bound_problem
def generate_lower_bound_problem(
    best_hyperparameters,
    init_constraints,
    best_model_state,
    data,
    di,
    preset_model,
    probeID=-1
)
View Source
    @classmethod

    def generate_lower_bound_problem(

        cls,

        best_hyperparameters,

        init_constraints,

        best_model_state,

        data,

        di,

        preset_model,

        probeID=-1,

    ):

        problem = cls(

            di,

            data,

            best_hyperparameters,

            init_constraints,

            preset_model=preset_model,

            best_model_state=best_model_state,

            probeID=probeID,

        )

        problem.init_objective_LB()

        problem.isLowerBound = True

        yield problem
generate_upper_bound_problem
def generate_upper_bound_problem(
    best_hyperparameters,
    init_constraints,
    best_model_state,
    data,
    di,
    preset_model,
    probeID=-1
)
View Source
    @classmethod

    def generate_upper_bound_problem(

        cls,

        best_hyperparameters,

        init_constraints,

        best_model_state,

        data,

        di,

        preset_model,

        probeID=-1,

    ):

        is_priv = is_lupi_feature(

            di, data, best_model_state

        )  # Is it a lupi feature where we need additional candidate problems?

        if not is_priv:

            yield from super().generate_upper_bound_problem(

                best_hyperparameters,

                init_constraints,

                best_model_state,

                data,

                di,

                preset_model,

                probeID=probeID,

            )

        else:

            for sign, pos in product([1, -1], [True, False]):

                problem = cls(

                    di,

                    data,

                    best_hyperparameters,

                    init_constraints,

                    preset_model=preset_model,

                    best_model_state=best_model_state,

                    probeID=probeID,

                )

                problem.init_objective_UB(sign=sign, pos=pos)

                yield problem

Instance variables

accepted_status
constraints
cvx_problem
isProbe
is_solved
objective
probeID
solved_relevance
solver_kwargs

Methods

add_constraint
def add_constraint(
    self,
    new
)
View Source
    def add_constraint(self, new):

        self._constraints.append(new)
init_objective_LB
def init_objective_LB(
    self,
    **kwargs
)
View Source
    def init_objective_LB(self, **kwargs):

        # We have two models basically with different indexes

        if self.isPriv:

            self._init_objective_LB_LUPI(**kwargs)

        else:

            # We call sibling class of our lupi class, which is the normal problem

            super().init_objective_LB(**kwargs)
init_objective_UB
def init_objective_UB(
    self,
    **kwargs
)
View Source
    def init_objective_UB(self, **kwargs):

        # We have two models basically with different indexes

        if self.isPriv:

            self._init_objective_UB_LUPI(**kwargs)

        else:

            # We call sibling class of our lupi class, which is the normal problem

            super().init_objective_UB(**kwargs)
preprocessing_data
def preprocessing_data(
    self,
    data,
    best_model_state
)
View Source
    def preprocessing_data(self, data, best_model_state):

        lupi_features = best_model_state["lupi_features"]

        X_combined, y = data

        X, X_priv = split_dataset(X_combined, lupi_features)

        self.X_priv = X_priv

        super().preprocessing_data((X, y), best_model_state)

        assert lupi_features == X_priv.shape[1]

        self.d_priv = lupi_features

        # LUPI model, we need to offset the index

        self.lupi_index = self.current_feature - self.d

        if self.lupi_index >= 0:

            self.isPriv = True

        else:

            self.isPriv = False
solve
def solve(
    self
) -> object
View Source
    def solve(self) -> object:

        # We init cvx problem here because pickling LP solver objects is problematic

        # by deferring it to here, worker threads do the problem building themselves and we spare the serialization

        self._cvx_problem = cvx.Problem(

            objective=self.objective, constraints=self.constraints

        )

        try:

            # print("Solve", self)

            self._cvx_problem.solve(**self.solver_kwargs)

        except SolverError:

            # We ignore Solver Errors, which are common with our framework:

            # We solve multiple problems per bound and choose a feasible solution later (see '_create_interval')

            pass

        self._solver_status = self._cvx_problem.status

        # self._cvx_problem = None

        return self

LUPI_Regression_SVM

class LUPI_Regression_SVM(
    C=1,
    epsilon=0.1,
    scaling_lupi_w=1,
    scaling_lupi_loss=1,
    lupi_features=None
)

Helper class that provides a standard way to create an ABC using inheritance.

View Source
class LUPI_Regression_SVM(LUPI_InitModel):

    HYPERPARAMETER = ["C", "epsilon", "scaling_lupi_w", "scaling_lupi_loss"]

    def __init__(

        self,

        C=1,

        epsilon=0.1,

        scaling_lupi_w=1,

        scaling_lupi_loss=1,

        lupi_features=None,

    ):

        super().__init__()

        self.epsilon = epsilon

        self.scaling_lupi_loss = scaling_lupi_loss

        self.scaling_lupi_w = scaling_lupi_w

        self.C = C

        self.lupi_features = lupi_features

    def fit(self, X_combined, y, lupi_features=None):

        """

        Parameters

        ----------

        lupi_features : int

            Number of features in dataset which are considered privileged information (PI).

            PI features are expected to be the last features in the dataset.

        """

        if lupi_features is None:

            try:

                lupi_features = self.lupi_features

                self.lupi_features = lupi_features

            except:

                raise ValueError("No amount of lupi features given.")

        X, X_priv = split_dataset(X_combined, self.lupi_features)

        (n, d) = X.shape

        # Get parameters from CV model without any feature contstraints

        C = self.get_params()["C"]

        epsilon = self.get_params()["epsilon"]

        scaling_lupi_w = self.get_params()["scaling_lupi_w"]

        scaling_lupi_loss = self.get_params()["scaling_lupi_loss"]

        # Initalize Variables in cvxpy

        w = cvx.Variable(shape=(d), name="w")

        b = cvx.Variable(name="bias")

        w_priv_pos = cvx.Variable(lupi_features, name="w_priv_pos")

        b_priv_pos = cvx.Variable(name="bias_priv_pos")

        w_priv_neg = cvx.Variable(lupi_features, name="w_priv_neg")

        b_priv_neg = cvx.Variable(name="bias_priv_neg")

        slack = cvx.Variable(shape=(n), name="slack")

        # Define functions for better readability

        priv_function_pos = X_priv @ w_priv_pos + b_priv_pos

        priv_function_neg = X_priv @ w_priv_neg + b_priv_neg

        # Combined loss of lupi function and normal slacks, scaled by two constants

        priv_loss_pos = cvx.sum(priv_function_pos)

        priv_loss_neg = cvx.sum(priv_function_neg)

        priv_loss = priv_loss_pos + priv_loss_neg

        slack_loss = cvx.sum(slack)

        loss = scaling_lupi_loss * priv_loss + slack_loss

        # L1 norm regularization of both functions with 1 scaling constant

        weight_regularization = 0.5 * (

            cvx.norm(w, 1)

            + scaling_lupi_w

            * (0.5 * cvx.norm(w_priv_pos, 1) + 0.5 * cvx.norm(w_priv_neg, 1))

        )

        constraints = [

            y - X @ w - b <= epsilon + priv_function_pos + slack,

            X @ w + b - y <= epsilon + priv_function_neg + slack,

            priv_function_pos >= 0,

            priv_function_neg >= 0,

            # priv_loss_pos >= 0,

            # priv_loss_neg >= 0,

            # slack_loss >= 0,

            slack >= 0,

            # loss >= 0,

        ]

        objective = cvx.Minimize(C * loss + weight_regularization)

        # Solve problem.

        problem = cvx.Problem(objective, constraints)

        problem.solve(**self.SOLVER_PARAMS)

        self.model_state = {

            "signs_pos": priv_function_pos.value > 0,

            "signs_neg": priv_function_neg.value > 0,

            "w": w.value,

            "w_priv_pos": w_priv_pos.value,

            "w_priv_neg": w_priv_neg.value,

            "b": b.value,

            "b_priv_pos": b_priv_pos.value,

            "b_priv_neg": b_priv_neg.value,

            "lupi_features": lupi_features,  # Number of lupi features in the dataset TODO: Move this somewhere else,

        }

        w_l1 = np.linalg.norm(w.value, ord=1)

        w_priv_pos_l1 = np.linalg.norm(w_priv_pos.value, ord=1)

        w_priv_neg_l1 = np.linalg.norm(w_priv_neg.value, ord=1)

        # We take the mean to combine all submodels (for priv) into a single normalization factor

        w_priv_l1 = w_priv_pos_l1 + w_priv_neg_l1

        self.constraints = {

            "priv_loss": priv_loss.value,

            "scaling_lupi_loss": scaling_lupi_loss,

            # "loss_slack": slack_loss.value,

            "loss": loss.value,

            "w_l1": w_l1,

            "w_priv_l1": w_priv_l1,

            "w_priv_pos_l1": w_priv_pos_l1,

            "w_priv_neg_l1": w_priv_neg_l1,

        }

        return self

    @property

    def SOLVER_PARAMS(cls):

        return {"solver": "ECOS", "verbose": False}

    def predict(self, X):

        """

        Method to predict points using svm classification rule.

        We use both normal and priv. features.

        This function is mainly used for CV purposes to find the best parameters according to score.

        Parameters

        ----------

        X : numpy.ndarray

        """

        X, X_priv = split_dataset(X, self.lupi_features)

        w = self.model_state["w"]

        b = self.model_state["b"]

        y = np.dot(X, w) + b

        return y

    def score(self, X, y, **kwargs):

        prediction = self.predict(X)

        score = r2_score(y, prediction)

        return score

Ancestors (in MRO)

  • fri.model.base_initmodel.LUPI_InitModel
  • fri.model.base_initmodel.InitModel
  • abc.ABC
  • sklearn.base.BaseEstimator

Class variables

HYPERPARAMETER

Instance variables

L1_factor
L1_factor_priv
SOLVER_PARAMS

Methods

fit
def fit(
    self,
    X_combined,
    y,
    lupi_features=None
)

Parameters

lupi_features : int Number of features in dataset which are considered privileged information (PI). PI features are expected to be the last features in the dataset.

View Source
    def fit(self, X_combined, y, lupi_features=None):

        """

        Parameters

        ----------

        lupi_features : int

            Number of features in dataset which are considered privileged information (PI).

            PI features are expected to be the last features in the dataset.

        """

        if lupi_features is None:

            try:

                lupi_features = self.lupi_features

                self.lupi_features = lupi_features

            except:

                raise ValueError("No amount of lupi features given.")

        X, X_priv = split_dataset(X_combined, self.lupi_features)

        (n, d) = X.shape

        # Get parameters from CV model without any feature contstraints

        C = self.get_params()["C"]

        epsilon = self.get_params()["epsilon"]

        scaling_lupi_w = self.get_params()["scaling_lupi_w"]

        scaling_lupi_loss = self.get_params()["scaling_lupi_loss"]

        # Initalize Variables in cvxpy

        w = cvx.Variable(shape=(d), name="w")

        b = cvx.Variable(name="bias")

        w_priv_pos = cvx.Variable(lupi_features, name="w_priv_pos")

        b_priv_pos = cvx.Variable(name="bias_priv_pos")

        w_priv_neg = cvx.Variable(lupi_features, name="w_priv_neg")

        b_priv_neg = cvx.Variable(name="bias_priv_neg")

        slack = cvx.Variable(shape=(n), name="slack")

        # Define functions for better readability

        priv_function_pos = X_priv @ w_priv_pos + b_priv_pos

        priv_function_neg = X_priv @ w_priv_neg + b_priv_neg

        # Combined loss of lupi function and normal slacks, scaled by two constants

        priv_loss_pos = cvx.sum(priv_function_pos)

        priv_loss_neg = cvx.sum(priv_function_neg)

        priv_loss = priv_loss_pos + priv_loss_neg

        slack_loss = cvx.sum(slack)

        loss = scaling_lupi_loss * priv_loss + slack_loss

        # L1 norm regularization of both functions with 1 scaling constant

        weight_regularization = 0.5 * (

            cvx.norm(w, 1)

            + scaling_lupi_w

            * (0.5 * cvx.norm(w_priv_pos, 1) + 0.5 * cvx.norm(w_priv_neg, 1))

        )

        constraints = [

            y - X @ w - b <= epsilon + priv_function_pos + slack,

            X @ w + b - y <= epsilon + priv_function_neg + slack,

            priv_function_pos >= 0,

            priv_function_neg >= 0,

            # priv_loss_pos >= 0,

            # priv_loss_neg >= 0,

            # slack_loss >= 0,

            slack >= 0,

            # loss >= 0,

        ]

        objective = cvx.Minimize(C * loss + weight_regularization)

        # Solve problem.

        problem = cvx.Problem(objective, constraints)

        problem.solve(**self.SOLVER_PARAMS)

        self.model_state = {

            "signs_pos": priv_function_pos.value > 0,

            "signs_neg": priv_function_neg.value > 0,

            "w": w.value,

            "w_priv_pos": w_priv_pos.value,

            "w_priv_neg": w_priv_neg.value,

            "b": b.value,

            "b_priv_pos": b_priv_pos.value,

            "b_priv_neg": b_priv_neg.value,

            "lupi_features": lupi_features,  # Number of lupi features in the dataset TODO: Move this somewhere else,

        }

        w_l1 = np.linalg.norm(w.value, ord=1)

        w_priv_pos_l1 = np.linalg.norm(w_priv_pos.value, ord=1)

        w_priv_neg_l1 = np.linalg.norm(w_priv_neg.value, ord=1)

        # We take the mean to combine all submodels (for priv) into a single normalization factor

        w_priv_l1 = w_priv_pos_l1 + w_priv_neg_l1

        self.constraints = {

            "priv_loss": priv_loss.value,

            "scaling_lupi_loss": scaling_lupi_loss,

            # "loss_slack": slack_loss.value,

            "loss": loss.value,

            "w_l1": w_l1,

            "w_priv_l1": w_priv_l1,

            "w_priv_pos_l1": w_priv_pos_l1,

            "w_priv_neg_l1": w_priv_neg_l1,

        }

        return self
get_params
def get_params(
    self,
    deep=True
)

Get parameters for this estimator.

Parameters

deep : bool, default=True If True, will return the parameters for this estimator and contained subobjects that are estimators.

Returns

params : mapping of string to any Parameter names mapped to their values.

View Source
    def get_params(self, deep=True):

        """

        Get parameters for this estimator.

        Parameters

        ----------

        deep : bool, default=True

            If True, will return the parameters for this estimator and

            contained subobjects that are estimators.

        Returns

        -------

        params : mapping of string to any

            Parameter names mapped to their values.

        """

        out = dict()

        for key in self._get_param_names():

            try:

                value = getattr(self, key)

            except AttributeError:

                warnings.warn('From version 0.24, get_params will raise an '

                              'AttributeError if a parameter cannot be '

                              'retrieved as an instance attribute. Previously '

                              'it would return None.',

                              FutureWarning)

                value = None

            if deep and hasattr(value, 'get_params'):

                deep_items = value.get_params().items()

                out.update((key + '__' + k, val) for k, val in deep_items)

            out[key] = value

        return out
make_scorer
def make_scorer(
    self
)
View Source
    def make_scorer(self):

        return None, None
predict
def predict(
    self,
    X
)

Method to predict points using svm classification rule. We use both normal and priv. features. This function is mainly used for CV purposes to find the best parameters according to score.

Parameters

X : numpy.ndarray

View Source
    def predict(self, X):

        """

        Method to predict points using svm classification rule.

        We use both normal and priv. features.

        This function is mainly used for CV purposes to find the best parameters according to score.

        Parameters

        ----------

        X : numpy.ndarray

        """

        X, X_priv = split_dataset(X, self.lupi_features)

        w = self.model_state["w"]

        b = self.model_state["b"]

        y = np.dot(X, w) + b

        return y
score
def score(
    self,
    X,
    y,
    **kwargs
)
View Source
    def score(self, X, y, **kwargs):

        prediction = self.predict(X)

        score = r2_score(y, prediction)

        return score
set_params
def set_params(
    self,
    **params
)

Set the parameters of this estimator.

The method works on simple estimators as well as on nested objects (such as pipelines). The latter have parameters of the form <component>__<parameter> so that it's possible to update each component of a nested object.

Parameters

**params : dict Estimator parameters.

Returns

self : object Estimator instance.

View Source
    def set_params(self, **params):

        """

        Set the parameters of this estimator.

        The method works on simple estimators as well as on nested objects

        (such as pipelines). The latter have parameters of the form

        ``<component>__<parameter>`` so that it's possible to update each

        component of a nested object.

        Parameters

        ----------

        **params : dict

            Estimator parameters.

        Returns

        -------

        self : object

            Estimator instance.

        """

        if not params:

            # Simple optimization to gain speed (inspect is slow)

            return self

        valid_params = self.get_params(deep=True)

        nested_params = defaultdict(dict)  # grouped by prefix

        for key, value in params.items():

            key, delim, sub_key = key.partition('__')

            if key not in valid_params:

                raise ValueError('Invalid parameter %s for estimator %s. '

                                 'Check the list of available parameters '

                                 'with `estimator.get_params().keys()`.' %

                                 (key, self))

            if delim:

                nested_params[key][sub_key] = value

            else:

                setattr(self, key, value)

                valid_params[key] = value

        for key, sub_params in nested_params.items():

            valid_params[key].set_params(**sub_params)

        return self