Skip to content

Module fri.model.lupi_ordinal_regression

View Source
from itertools import product

import cvxpy as cvx

import numpy as np

from sklearn.metrics import make_scorer

from sklearn.utils import check_X_y

from fri.model.base_lupi import (

    LUPI_Relevance_CVXProblem,

    split_dataset,

    is_lupi_feature,

)

from fri.model.ordinal_regression import (

    OrdinalRegression_Relevance_Bound,

    ordinal_scores,

)

from .base_initmodel import LUPI_InitModel

from .base_type import ProblemType

class LUPI_OrdinalRegression(ProblemType):

    def __init__(self, **kwargs):

        super().__init__(**kwargs)

        self._lupi_features = None

    @property

    def lupi_features(self):

        return self._lupi_features

    @classmethod

    def parameters(cls):

        return ["C", "scaling_lupi_w"]

    @property

    def get_initmodel_template(cls):

        return LUPI_OrdinalRegression_SVM

    @property

    def get_cvxproblem_template(cls):

        return LUPI_OrdinalRegression_Relevance_Bound

    def relax_factors(cls):

        return ["loss_slack", "w_l1_slack"]

    def preprocessing(self, data, lupi_features=None):

        X, y = data

        d = X.shape[1]

        if lupi_features is None:

            raise ValueError("Argument 'lupi_features' missing in fit() call.")

        if not isinstance(lupi_features, int):

            raise ValueError("Argument 'lupi_features' is not type int.")

        if not 0 < lupi_features < d:

            raise ValueError(

                "Argument 'lupi_features' looks wrong. We need at least 1 priviliged feature (>0) or at least one normal feature."

            )

        self._lupi_features = lupi_features

        # Check that X and y have correct shape

        X, y = check_X_y(X, y)

        if np.min(y) > 0:

            print("First ordinal class has index > 0. Shifting index...")

            y = y - np.min(y)

        return X, y

class LUPI_OrdinalRegression_SVM(LUPI_InitModel):

    HYPERPARAMETER = ["C", "scaling_lupi_w"]

    def __init__(self, C=1, scaling_lupi_w=1, lupi_features=None):

        super().__init__()

        self.scaling_lupi_w = scaling_lupi_w

        self.C = C

        self.lupi_features = lupi_features

    def fit(self, X_combined, y, lupi_features=None):

        """

        Parameters

        ----------

        lupi_features : int

            Number of features in dataset which are considered privileged information (PI).

            PI features are expected to be the last features in the dataset.

        """

        if lupi_features is None:

            try:

                lupi_features = self.lupi_features

                self.lupi_features = lupi_features

            except:

                raise ValueError("No amount of lupi features given.")

        X, X_priv = split_dataset(X_combined, self.lupi_features)

        (n, d) = X.shape

        self.classes_ = np.unique(y)

        # Get parameters from CV model without any feature contstraints

        C = self.get_params()["C"]

        scaling_lupi_w = self.get_params()["scaling_lupi_w"]

        get_original_bin_name, n_bins = get_bin_mapping(y)

        n_boundaries = n_bins - 1

        # Initalize Variables in cvxpy

        w = cvx.Variable(shape=(d), name="w")

        b_s = cvx.Variable(shape=(n_boundaries), name="bias")

        w_priv = cvx.Variable(shape=(self.lupi_features, 2), name="w_priv")

        d_priv = cvx.Variable(shape=(2), name="bias_priv")

        def priv_function(bin, sign):

            indices = np.where(y == get_original_bin_name[bin])

            return X_priv[indices] @ w_priv[:, sign] + d_priv[sign]

        # L1 norm regularization of both functions with 1 scaling constant

        priv_l1_1 = cvx.norm(w_priv[:, 0], 1)

        priv_l1_2 = cvx.norm(w_priv[:, 1], 1)

        w_priv_l1 = priv_l1_1 + priv_l1_2

        w_l1 = cvx.norm(w, 1)

        weight_regularization = 0.5 * (w_l1 + scaling_lupi_w * w_priv_l1)

        constraints = []

        loss = 0

        for left_bin in range(0, n_bins - 1):

            indices = np.where(y == get_original_bin_name[left_bin])

            constraints.append(

                X[indices] @ w - b_s[left_bin] <= -1 + priv_function(left_bin, 0)

            )

            constraints.append(priv_function(left_bin, 0) >= 0)

            loss += cvx.sum(priv_function(left_bin, 0))

        # Add constraints for slack into right neighboring bins

        for right_bin in range(1, n_bins):

            indices = np.where(y == get_original_bin_name[right_bin])

            constraints.append(

                X[indices] @ w - b_s[right_bin - 1] >= +1 - priv_function(right_bin, 1)

            )

            constraints.append(priv_function(right_bin, 1) >= 0)

            loss += cvx.sum(priv_function(right_bin, 1))

        for i_boundary in range(0, n_boundaries - 1):

            constraints.append(b_s[i_boundary] <= b_s[i_boundary + 1])

        objective = cvx.Minimize(C * loss + weight_regularization)

        # Solve problem.

        problem = cvx.Problem(objective, constraints)

        problem.solve(**self.SOLVER_PARAMS)

        w = w.value

        b_s = b_s.value

        self.model_state = {

            "w": w,

            "b_s": b_s,

            "w_priv": w_priv.value,

            "d_priv": d_priv.value,

            "lupi_features": lupi_features,  # Number of lupi features in the dataset TODO: Move this somewhere else

            "bin_boundaries": n_boundaries,

        }

        self.constraints = {

            "loss": loss.value,

            "w_l1": w_l1.value,

            "w_priv_l1": w_priv_l1.value,

        }

        return self

    def predict(self, X):

        X, X_priv = split_dataset(X, self.lupi_features)

        w = self.model_state["w"]

        b_s = self.model_state["b_s"]

        scores = np.dot(X, w.T)[np.newaxis]

        bin_thresholds = np.append(b_s, np.inf)

        # If thresholds are smaller than score the value belongs to the bigger bin

        # after subtracting we check for positive elements

        indices = np.sum(scores.T - bin_thresholds >= 0, -1)

        return self.classes_[indices]

    def score(self, X, y, error_type="mmae", return_error=False, **kwargs):

        X, y = check_X_y(X, y)

        prediction = self.predict(X)

        score = ordinal_scores(y, prediction, error_type, return_error=return_error)

        return score

    def make_scorer(self):

        # Use multiple scores for ordinal regression

        mze = make_scorer(ordinal_scores, error_type="mze")

        mae = make_scorer(ordinal_scores, error_type="mae")

        mmae = make_scorer(ordinal_scores, error_type="mmae")

        scorer = {"mze": mze, "mae": mae, "mmae": mmae}

        return scorer, "mmae"

def get_bin_mapping(y):

    """

    Get ordered unique classes and corresponding mapping from old names

    Parameters

    ----------

    y: array of discrete values (int, str)

    Returns

    -------

    """

    classes_ = np.unique(y)

    original_bins = sorted(classes_)

    n_bins = len(original_bins)

    bins = np.arange(n_bins)

    get_old_bin = dict(zip(bins, original_bins))

    return get_old_bin, n_bins

class LUPI_OrdinalRegression_Relevance_Bound(

    LUPI_Relevance_CVXProblem, OrdinalRegression_Relevance_Bound

):

    @classmethod

    def generate_lower_bound_problem(

        cls,

        best_hyperparameters,

        init_constraints,

        best_model_state,

        data,

        di,

        preset_model,

        probeID=-1,

    ):

        is_priv = is_lupi_feature(

            di, data, best_model_state

        )  # Is it a lupi feature where we need additional candidate problems?

        if not is_priv:

            yield from super().generate_lower_bound_problem(

                best_hyperparameters,

                init_constraints,

                best_model_state,

                data,

                di,

                preset_model,

                probeID=probeID,

            )

        else:

            for sign in [1, -1]:

                problem = cls(

                    di,

                    data,

                    best_hyperparameters,

                    init_constraints,

                    preset_model=preset_model,

                    best_model_state=best_model_state,

                    probeID=probeID,

                )

                problem.init_objective_LB(sign=sign)

                problem.isLowerBound = True

                yield problem

    @classmethod

    def generate_upper_bound_problem(

        cls,

        best_hyperparameters,

        init_constraints,

        best_model_state,

        data,

        di,

        preset_model,

        probeID=-1,

    ):

        is_priv = is_lupi_feature(

            di, data, best_model_state

        )  # Is it a lupi feature where we need additional candidate problems?

        if not is_priv:

            yield from super().generate_upper_bound_problem(

                best_hyperparameters,

                init_constraints,

                best_model_state,

                data,

                di,

                preset_model,

                probeID=probeID,

            )

        else:

            for sign, pos in product([1, -1], [0, 1]):

                problem = cls(

                    di,

                    data,

                    best_hyperparameters,

                    init_constraints,

                    preset_model=preset_model,

                    best_model_state=best_model_state,

                    probeID=probeID,

                )

                problem.init_objective_UB(sign=sign, pos=pos)

                yield problem

    @classmethod

    def aggregate_min_candidates(cls, min_problems_candidates):

        vals = [candidate.solved_relevance for candidate in min_problems_candidates]

        # We take the max of mins because we need the necessary contribution over all functions

        min_value = max(vals)

        return min_value

    def _init_objective_LB_LUPI(self, sign=None, bin_index=None, **kwargs):

        self.add_constraint(

            sign * self.w_priv[self.lupi_index, :] <= self.feature_relevance

        )

        self._objective = cvx.Minimize(self.feature_relevance)

    def _init_objective_UB_LUPI(self, sign=None, pos=None, **kwargs):

        self.add_constraint(

            self.feature_relevance <= sign * self.w_priv[self.lupi_index, pos]

        )

        self._objective = cvx.Maximize(self.feature_relevance)

    def _init_constraints(self, parameters, init_model_constraints):

        # Upper constraints from initial model

        init_w_l1 = init_model_constraints["w_l1"]

        init_w_priv_l1 = init_model_constraints["w_priv_l1"]

        init_loss = init_model_constraints["loss"]

        scaling_lupi_w = parameters["scaling_lupi_w"]

        get_original_bin_name, n_bins = get_bin_mapping(self.y)

        n_boundaries = n_bins - 1

        # Initalize Variables in cvxpy

        w = cvx.Variable(shape=(self.d), name="w")

        b_s = cvx.Variable(shape=(n_boundaries), name="bias")

        w_priv = cvx.Variable(shape=(self.d_priv, 2), name="w_priv")

        d_priv = cvx.Variable(shape=(2), name="bias_priv")

        def priv_function(bin, sign):

            indices = np.where(self.y == get_original_bin_name[bin])

            return self.X_priv[indices] @ w_priv[:, sign] + d_priv[sign]

        # L1 norm regularization of both functions with 1 scaling constant

        priv_l1_1 = cvx.norm(w_priv[:, 0], 1)

        priv_l1_2 = cvx.norm(w_priv[:, 1], 1)

        w_priv_l1 = priv_l1_1 + priv_l1_2

        w_l1 = cvx.norm(w, 1)

        loss = 0

        for left_bin in range(0, n_bins - 1):

            indices = np.where(self.y == get_original_bin_name[left_bin])

            self.add_constraint(

                self.X[indices] @ w - b_s[left_bin] <= -1 + priv_function(left_bin, 0)

            )

            self.add_constraint(priv_function(left_bin, 0) >= 0)

            loss += cvx.sum(priv_function(left_bin, 0))

        # Add constraints for slack into right neighboring bins

        for right_bin in range(1, n_bins):

            indices = np.where(self.y == get_original_bin_name[right_bin])

            self.add_constraint(

                self.X[indices] @ w - b_s[right_bin - 1]

                >= +1 - priv_function(right_bin, 1)

            )

            self.add_constraint(priv_function(right_bin, 1) >= 0)

            loss += cvx.sum(priv_function(right_bin, 1))

        for i_boundary in range(0, n_boundaries - 1):

            self.add_constraint(b_s[i_boundary] <= b_s[i_boundary + 1])

        self.add_constraint(

            w_l1 + scaling_lupi_w * w_priv_l1

            <= init_w_l1 + scaling_lupi_w * init_w_priv_l1

        )

        self.add_constraint(loss <= init_loss)

        self.w = w

        self.w_priv = w_priv

        self.feature_relevance = cvx.Variable(nonneg=True, name="Feature Relevance")

Functions

get_bin_mapping

def get_bin_mapping(
    y
)

Get ordered unique classes and corresponding mapping from old names Parameters


y: array of discrete values (int, str)

Returns


View Source
def get_bin_mapping(y):

    """

    Get ordered unique classes and corresponding mapping from old names

    Parameters

    ----------

    y: array of discrete values (int, str)

    Returns

    -------

    """

    classes_ = np.unique(y)

    original_bins = sorted(classes_)

    n_bins = len(original_bins)

    bins = np.arange(n_bins)

    get_old_bin = dict(zip(bins, original_bins))

    return get_old_bin, n_bins

Classes

LUPI_OrdinalRegression

class LUPI_OrdinalRegression(
    **kwargs
)

Helper class that provides a standard way to create an ABC using inheritance.

View Source
class LUPI_OrdinalRegression(ProblemType):

    def __init__(self, **kwargs):

        super().__init__(**kwargs)

        self._lupi_features = None

    @property

    def lupi_features(self):

        return self._lupi_features

    @classmethod

    def parameters(cls):

        return ["C", "scaling_lupi_w"]

    @property

    def get_initmodel_template(cls):

        return LUPI_OrdinalRegression_SVM

    @property

    def get_cvxproblem_template(cls):

        return LUPI_OrdinalRegression_Relevance_Bound

    def relax_factors(cls):

        return ["loss_slack", "w_l1_slack"]

    def preprocessing(self, data, lupi_features=None):

        X, y = data

        d = X.shape[1]

        if lupi_features is None:

            raise ValueError("Argument 'lupi_features' missing in fit() call.")

        if not isinstance(lupi_features, int):

            raise ValueError("Argument 'lupi_features' is not type int.")

        if not 0 < lupi_features < d:

            raise ValueError(

                "Argument 'lupi_features' looks wrong. We need at least 1 priviliged feature (>0) or at least one normal feature."

            )

        self._lupi_features = lupi_features

        # Check that X and y have correct shape

        X, y = check_X_y(X, y)

        if np.min(y) > 0:

            print("First ordinal class has index > 0. Shifting index...")

            y = y - np.min(y)

        return X, y

Ancestors (in MRO)

  • fri.model.base_type.ProblemType
  • abc.ABC

Static methods

parameters
def parameters(

)
View Source
    @classmethod

    def parameters(cls):

        return ["C", "scaling_lupi_w"]

Instance variables

get_cvxproblem_template
get_initmodel_template
lupi_features

Methods

get_all_parameters
def get_all_parameters(
    self
)
View Source
    def get_all_parameters(self):

        return {p: self.get_chosen_parameter(p) for p in self.parameters()}
get_all_relax_factors
def get_all_relax_factors(
    self
)
View Source
    def get_all_relax_factors(self):

        return {p: self.get_chosen_relax_factors(p) for p in self.relax_factors()}
get_chosen_parameter
def get_chosen_parameter(
    self,
    p
)
View Source
    def get_chosen_parameter(self, p):

        try:

            return [

                self.chosen_parameters_[p]

            ]  # We return list for param search function

        except:

            # # TODO: rewrite the parameter logic

            # # TODO: move this to subclass

            if p == "scaling_lupi_w":

                # return [0.1, 1, 10, 100, 1000]

                return scipy.stats.reciprocal(a=1e-15, b=1e10)

            # if p == "scaling_lupi_loss":

            #    # value 0>p<1 causes standard svm solution

            #    # p>1 encourages usage of lupi function

            #    return scipy.stats.reciprocal(a=1e-15, b=1e15)

            if p == "C":

                return scipy.stats.reciprocal(a=1e-5, b=1e5)

            if p == "epsilon":

                return [0, 0.001, 0.01, 0.1, 1, 10, 100]

            else:

                return scipy.stats.reciprocal(a=1e-10, b=1e10)
get_chosen_relax_factors
def get_chosen_relax_factors(
    self,
    p
)
View Source
    def get_chosen_relax_factors(self, p):

        try:

            factor = self.relax_factors_[p]

        except KeyError:

            try:

                factor = self.relax_factors_[p + "_slack"]

            except KeyError:

                factor = 0.1

        if factor < 0:

            raise ValueError("Slack Factor multiplier is positive!")

        return factor
get_relaxed_constraints
def get_relaxed_constraints(
    self,
    constraints
)
View Source
    def get_relaxed_constraints(self, constraints):

        return {c: self.relax_constraint(c, v) for c, v in constraints.items()}
postprocessing
def postprocessing(
    self,
    bounds
)
View Source
    def postprocessing(self, bounds):

        return bounds
preprocessing
def preprocessing(
    self,
    data,
    lupi_features=None
)
View Source
    def preprocessing(self, data, lupi_features=None):

        X, y = data

        d = X.shape[1]

        if lupi_features is None:

            raise ValueError("Argument 'lupi_features' missing in fit() call.")

        if not isinstance(lupi_features, int):

            raise ValueError("Argument 'lupi_features' is not type int.")

        if not 0 < lupi_features < d:

            raise ValueError(

                "Argument 'lupi_features' looks wrong. We need at least 1 priviliged feature (>0) or at least one normal feature."

            )

        self._lupi_features = lupi_features

        # Check that X and y have correct shape

        X, y = check_X_y(X, y)

        if np.min(y) > 0:

            print("First ordinal class has index > 0. Shifting index...")

            y = y - np.min(y)

        return X, y
relax_constraint
def relax_constraint(
    self,
    key,
    value
)
View Source
    def relax_constraint(self, key, value):

        return value * (1 + self.get_chosen_relax_factors(key))
relax_factors
def relax_factors(
    cls
)
View Source
    def relax_factors(cls):

        return ["loss_slack", "w_l1_slack"]

LUPI_OrdinalRegression_Relevance_Bound

class LUPI_OrdinalRegression_Relevance_Bound(
    current_feature: int,
    data: tuple,
    hyperparameters,
    best_model_constraints,
    preset_model=None,
    best_model_state=None,
    probeID=-1
)

Helper class that provides a standard way to create an ABC using inheritance.

View Source
class LUPI_OrdinalRegression_Relevance_Bound(

    LUPI_Relevance_CVXProblem, OrdinalRegression_Relevance_Bound

):

    @classmethod

    def generate_lower_bound_problem(

        cls,

        best_hyperparameters,

        init_constraints,

        best_model_state,

        data,

        di,

        preset_model,

        probeID=-1,

    ):

        is_priv = is_lupi_feature(

            di, data, best_model_state

        )  # Is it a lupi feature where we need additional candidate problems?

        if not is_priv:

            yield from super().generate_lower_bound_problem(

                best_hyperparameters,

                init_constraints,

                best_model_state,

                data,

                di,

                preset_model,

                probeID=probeID,

            )

        else:

            for sign in [1, -1]:

                problem = cls(

                    di,

                    data,

                    best_hyperparameters,

                    init_constraints,

                    preset_model=preset_model,

                    best_model_state=best_model_state,

                    probeID=probeID,

                )

                problem.init_objective_LB(sign=sign)

                problem.isLowerBound = True

                yield problem

    @classmethod

    def generate_upper_bound_problem(

        cls,

        best_hyperparameters,

        init_constraints,

        best_model_state,

        data,

        di,

        preset_model,

        probeID=-1,

    ):

        is_priv = is_lupi_feature(

            di, data, best_model_state

        )  # Is it a lupi feature where we need additional candidate problems?

        if not is_priv:

            yield from super().generate_upper_bound_problem(

                best_hyperparameters,

                init_constraints,

                best_model_state,

                data,

                di,

                preset_model,

                probeID=probeID,

            )

        else:

            for sign, pos in product([1, -1], [0, 1]):

                problem = cls(

                    di,

                    data,

                    best_hyperparameters,

                    init_constraints,

                    preset_model=preset_model,

                    best_model_state=best_model_state,

                    probeID=probeID,

                )

                problem.init_objective_UB(sign=sign, pos=pos)

                yield problem

    @classmethod

    def aggregate_min_candidates(cls, min_problems_candidates):

        vals = [candidate.solved_relevance for candidate in min_problems_candidates]

        # We take the max of mins because we need the necessary contribution over all functions

        min_value = max(vals)

        return min_value

    def _init_objective_LB_LUPI(self, sign=None, bin_index=None, **kwargs):

        self.add_constraint(

            sign * self.w_priv[self.lupi_index, :] <= self.feature_relevance

        )

        self._objective = cvx.Minimize(self.feature_relevance)

    def _init_objective_UB_LUPI(self, sign=None, pos=None, **kwargs):

        self.add_constraint(

            self.feature_relevance <= sign * self.w_priv[self.lupi_index, pos]

        )

        self._objective = cvx.Maximize(self.feature_relevance)

    def _init_constraints(self, parameters, init_model_constraints):

        # Upper constraints from initial model

        init_w_l1 = init_model_constraints["w_l1"]

        init_w_priv_l1 = init_model_constraints["w_priv_l1"]

        init_loss = init_model_constraints["loss"]

        scaling_lupi_w = parameters["scaling_lupi_w"]

        get_original_bin_name, n_bins = get_bin_mapping(self.y)

        n_boundaries = n_bins - 1

        # Initalize Variables in cvxpy

        w = cvx.Variable(shape=(self.d), name="w")

        b_s = cvx.Variable(shape=(n_boundaries), name="bias")

        w_priv = cvx.Variable(shape=(self.d_priv, 2), name="w_priv")

        d_priv = cvx.Variable(shape=(2), name="bias_priv")

        def priv_function(bin, sign):

            indices = np.where(self.y == get_original_bin_name[bin])

            return self.X_priv[indices] @ w_priv[:, sign] + d_priv[sign]

        # L1 norm regularization of both functions with 1 scaling constant

        priv_l1_1 = cvx.norm(w_priv[:, 0], 1)

        priv_l1_2 = cvx.norm(w_priv[:, 1], 1)

        w_priv_l1 = priv_l1_1 + priv_l1_2

        w_l1 = cvx.norm(w, 1)

        loss = 0

        for left_bin in range(0, n_bins - 1):

            indices = np.where(self.y == get_original_bin_name[left_bin])

            self.add_constraint(

                self.X[indices] @ w - b_s[left_bin] <= -1 + priv_function(left_bin, 0)

            )

            self.add_constraint(priv_function(left_bin, 0) >= 0)

            loss += cvx.sum(priv_function(left_bin, 0))

        # Add constraints for slack into right neighboring bins

        for right_bin in range(1, n_bins):

            indices = np.where(self.y == get_original_bin_name[right_bin])

            self.add_constraint(

                self.X[indices] @ w - b_s[right_bin - 1]

                >= +1 - priv_function(right_bin, 1)

            )

            self.add_constraint(priv_function(right_bin, 1) >= 0)

            loss += cvx.sum(priv_function(right_bin, 1))

        for i_boundary in range(0, n_boundaries - 1):

            self.add_constraint(b_s[i_boundary] <= b_s[i_boundary + 1])

        self.add_constraint(

            w_l1 + scaling_lupi_w * w_priv_l1

            <= init_w_l1 + scaling_lupi_w * init_w_priv_l1

        )

        self.add_constraint(loss <= init_loss)

        self.w = w

        self.w_priv = w_priv

        self.feature_relevance = cvx.Variable(nonneg=True, name="Feature Relevance")

Ancestors (in MRO)

  • fri.model.base_lupi.LUPI_Relevance_CVXProblem
  • fri.model.ordinal_regression.OrdinalRegression_Relevance_Bound
  • fri.model.base_cvxproblem.Relevance_CVXProblem
  • abc.ABC

Static methods

aggregate_max_candidates
def aggregate_max_candidates(
    max_problems_candidates
)
View Source
    @classmethod

    def aggregate_max_candidates(cls, max_problems_candidates):

        vals = [candidate.solved_relevance for candidate in max_problems_candidates]

        max_value = max(vals)

        return max_value
aggregate_min_candidates
def aggregate_min_candidates(
    min_problems_candidates
)
View Source
    @classmethod

    def aggregate_min_candidates(cls, min_problems_candidates):

        vals = [candidate.solved_relevance for candidate in min_problems_candidates]

        # We take the max of mins because we need the necessary contribution over all functions

        min_value = max(vals)

        return min_value
generate_lower_bound_problem
def generate_lower_bound_problem(
    best_hyperparameters,
    init_constraints,
    best_model_state,
    data,
    di,
    preset_model,
    probeID=-1
)
View Source
    @classmethod

    def generate_lower_bound_problem(

        cls,

        best_hyperparameters,

        init_constraints,

        best_model_state,

        data,

        di,

        preset_model,

        probeID=-1,

    ):

        is_priv = is_lupi_feature(

            di, data, best_model_state

        )  # Is it a lupi feature where we need additional candidate problems?

        if not is_priv:

            yield from super().generate_lower_bound_problem(

                best_hyperparameters,

                init_constraints,

                best_model_state,

                data,

                di,

                preset_model,

                probeID=probeID,

            )

        else:

            for sign in [1, -1]:

                problem = cls(

                    di,

                    data,

                    best_hyperparameters,

                    init_constraints,

                    preset_model=preset_model,

                    best_model_state=best_model_state,

                    probeID=probeID,

                )

                problem.init_objective_LB(sign=sign)

                problem.isLowerBound = True

                yield problem
generate_upper_bound_problem
def generate_upper_bound_problem(
    best_hyperparameters,
    init_constraints,
    best_model_state,
    data,
    di,
    preset_model,
    probeID=-1
)
View Source
    @classmethod

    def generate_upper_bound_problem(

        cls,

        best_hyperparameters,

        init_constraints,

        best_model_state,

        data,

        di,

        preset_model,

        probeID=-1,

    ):

        is_priv = is_lupi_feature(

            di, data, best_model_state

        )  # Is it a lupi feature where we need additional candidate problems?

        if not is_priv:

            yield from super().generate_upper_bound_problem(

                best_hyperparameters,

                init_constraints,

                best_model_state,

                data,

                di,

                preset_model,

                probeID=probeID,

            )

        else:

            for sign, pos in product([1, -1], [0, 1]):

                problem = cls(

                    di,

                    data,

                    best_hyperparameters,

                    init_constraints,

                    preset_model=preset_model,

                    best_model_state=best_model_state,

                    probeID=probeID,

                )

                problem.init_objective_UB(sign=sign, pos=pos)

                yield problem

Instance variables

accepted_status
constraints
cvx_problem
isProbe
is_solved
objective
probeID
solved_relevance
solver_kwargs

Methods

add_constraint
def add_constraint(
    self,
    new
)
View Source
    def add_constraint(self, new):

        self._constraints.append(new)
init_objective_LB
def init_objective_LB(
    self,
    **kwargs
)
View Source
    def init_objective_LB(self, **kwargs):

        # We have two models basically with different indexes

        if self.isPriv:

            self._init_objective_LB_LUPI(**kwargs)

        else:

            # We call sibling class of our lupi class, which is the normal problem

            super().init_objective_LB(**kwargs)
init_objective_UB
def init_objective_UB(
    self,
    **kwargs
)
View Source
    def init_objective_UB(self, **kwargs):

        # We have two models basically with different indexes

        if self.isPriv:

            self._init_objective_UB_LUPI(**kwargs)

        else:

            # We call sibling class of our lupi class, which is the normal problem

            super().init_objective_UB(**kwargs)
preprocessing_data
def preprocessing_data(
    self,
    data,
    best_model_state
)
View Source
    def preprocessing_data(self, data, best_model_state):

        lupi_features = best_model_state["lupi_features"]

        X_combined, y = data

        X, X_priv = split_dataset(X_combined, lupi_features)

        self.X_priv = X_priv

        super().preprocessing_data((X, y), best_model_state)

        assert lupi_features == X_priv.shape[1]

        self.d_priv = lupi_features

        # LUPI model, we need to offset the index

        self.lupi_index = self.current_feature - self.d

        if self.lupi_index >= 0:

            self.isPriv = True

        else:

            self.isPriv = False
solve
def solve(
    self
) -> object
View Source
    def solve(self) -> object:

        # We init cvx problem here because pickling LP solver objects is problematic

        # by deferring it to here, worker threads do the problem building themselves and we spare the serialization

        self._cvx_problem = cvx.Problem(

            objective=self.objective, constraints=self.constraints

        )

        try:

            # print("Solve", self)

            self._cvx_problem.solve(**self.solver_kwargs)

        except SolverError:

            # We ignore Solver Errors, which are common with our framework:

            # We solve multiple problems per bound and choose a feasible solution later (see '_create_interval')

            pass

        self._solver_status = self._cvx_problem.status

        # self._cvx_problem = None

        return self

LUPI_OrdinalRegression_SVM

class LUPI_OrdinalRegression_SVM(
    C=1,
    scaling_lupi_w=1,
    lupi_features=None
)

Helper class that provides a standard way to create an ABC using inheritance.

View Source
class LUPI_OrdinalRegression_SVM(LUPI_InitModel):

    HYPERPARAMETER = ["C", "scaling_lupi_w"]

    def __init__(self, C=1, scaling_lupi_w=1, lupi_features=None):

        super().__init__()

        self.scaling_lupi_w = scaling_lupi_w

        self.C = C

        self.lupi_features = lupi_features

    def fit(self, X_combined, y, lupi_features=None):

        """

        Parameters

        ----------

        lupi_features : int

            Number of features in dataset which are considered privileged information (PI).

            PI features are expected to be the last features in the dataset.

        """

        if lupi_features is None:

            try:

                lupi_features = self.lupi_features

                self.lupi_features = lupi_features

            except:

                raise ValueError("No amount of lupi features given.")

        X, X_priv = split_dataset(X_combined, self.lupi_features)

        (n, d) = X.shape

        self.classes_ = np.unique(y)

        # Get parameters from CV model without any feature contstraints

        C = self.get_params()["C"]

        scaling_lupi_w = self.get_params()["scaling_lupi_w"]

        get_original_bin_name, n_bins = get_bin_mapping(y)

        n_boundaries = n_bins - 1

        # Initalize Variables in cvxpy

        w = cvx.Variable(shape=(d), name="w")

        b_s = cvx.Variable(shape=(n_boundaries), name="bias")

        w_priv = cvx.Variable(shape=(self.lupi_features, 2), name="w_priv")

        d_priv = cvx.Variable(shape=(2), name="bias_priv")

        def priv_function(bin, sign):

            indices = np.where(y == get_original_bin_name[bin])

            return X_priv[indices] @ w_priv[:, sign] + d_priv[sign]

        # L1 norm regularization of both functions with 1 scaling constant

        priv_l1_1 = cvx.norm(w_priv[:, 0], 1)

        priv_l1_2 = cvx.norm(w_priv[:, 1], 1)

        w_priv_l1 = priv_l1_1 + priv_l1_2

        w_l1 = cvx.norm(w, 1)

        weight_regularization = 0.5 * (w_l1 + scaling_lupi_w * w_priv_l1)

        constraints = []

        loss = 0

        for left_bin in range(0, n_bins - 1):

            indices = np.where(y == get_original_bin_name[left_bin])

            constraints.append(

                X[indices] @ w - b_s[left_bin] <= -1 + priv_function(left_bin, 0)

            )

            constraints.append(priv_function(left_bin, 0) >= 0)

            loss += cvx.sum(priv_function(left_bin, 0))

        # Add constraints for slack into right neighboring bins

        for right_bin in range(1, n_bins):

            indices = np.where(y == get_original_bin_name[right_bin])

            constraints.append(

                X[indices] @ w - b_s[right_bin - 1] >= +1 - priv_function(right_bin, 1)

            )

            constraints.append(priv_function(right_bin, 1) >= 0)

            loss += cvx.sum(priv_function(right_bin, 1))

        for i_boundary in range(0, n_boundaries - 1):

            constraints.append(b_s[i_boundary] <= b_s[i_boundary + 1])

        objective = cvx.Minimize(C * loss + weight_regularization)

        # Solve problem.

        problem = cvx.Problem(objective, constraints)

        problem.solve(**self.SOLVER_PARAMS)

        w = w.value

        b_s = b_s.value

        self.model_state = {

            "w": w,

            "b_s": b_s,

            "w_priv": w_priv.value,

            "d_priv": d_priv.value,

            "lupi_features": lupi_features,  # Number of lupi features in the dataset TODO: Move this somewhere else

            "bin_boundaries": n_boundaries,

        }

        self.constraints = {

            "loss": loss.value,

            "w_l1": w_l1.value,

            "w_priv_l1": w_priv_l1.value,

        }

        return self

    def predict(self, X):

        X, X_priv = split_dataset(X, self.lupi_features)

        w = self.model_state["w"]

        b_s = self.model_state["b_s"]

        scores = np.dot(X, w.T)[np.newaxis]

        bin_thresholds = np.append(b_s, np.inf)

        # If thresholds are smaller than score the value belongs to the bigger bin

        # after subtracting we check for positive elements

        indices = np.sum(scores.T - bin_thresholds >= 0, -1)

        return self.classes_[indices]

    def score(self, X, y, error_type="mmae", return_error=False, **kwargs):

        X, y = check_X_y(X, y)

        prediction = self.predict(X)

        score = ordinal_scores(y, prediction, error_type, return_error=return_error)

        return score

    def make_scorer(self):

        # Use multiple scores for ordinal regression

        mze = make_scorer(ordinal_scores, error_type="mze")

        mae = make_scorer(ordinal_scores, error_type="mae")

        mmae = make_scorer(ordinal_scores, error_type="mmae")

        scorer = {"mze": mze, "mae": mae, "mmae": mmae}

        return scorer, "mmae"

Ancestors (in MRO)

  • fri.model.base_initmodel.LUPI_InitModel
  • fri.model.base_initmodel.InitModel
  • abc.ABC
  • sklearn.base.BaseEstimator

Class variables

HYPERPARAMETER
SOLVER_PARAMS

Instance variables

L1_factor
L1_factor_priv

Methods

fit
def fit(
    self,
    X_combined,
    y,
    lupi_features=None
)

Parameters

lupi_features : int Number of features in dataset which are considered privileged information (PI). PI features are expected to be the last features in the dataset.

View Source
    def fit(self, X_combined, y, lupi_features=None):

        """

        Parameters

        ----------

        lupi_features : int

            Number of features in dataset which are considered privileged information (PI).

            PI features are expected to be the last features in the dataset.

        """

        if lupi_features is None:

            try:

                lupi_features = self.lupi_features

                self.lupi_features = lupi_features

            except:

                raise ValueError("No amount of lupi features given.")

        X, X_priv = split_dataset(X_combined, self.lupi_features)

        (n, d) = X.shape

        self.classes_ = np.unique(y)

        # Get parameters from CV model without any feature contstraints

        C = self.get_params()["C"]

        scaling_lupi_w = self.get_params()["scaling_lupi_w"]

        get_original_bin_name, n_bins = get_bin_mapping(y)

        n_boundaries = n_bins - 1

        # Initalize Variables in cvxpy

        w = cvx.Variable(shape=(d), name="w")

        b_s = cvx.Variable(shape=(n_boundaries), name="bias")

        w_priv = cvx.Variable(shape=(self.lupi_features, 2), name="w_priv")

        d_priv = cvx.Variable(shape=(2), name="bias_priv")

        def priv_function(bin, sign):

            indices = np.where(y == get_original_bin_name[bin])

            return X_priv[indices] @ w_priv[:, sign] + d_priv[sign]

        # L1 norm regularization of both functions with 1 scaling constant

        priv_l1_1 = cvx.norm(w_priv[:, 0], 1)

        priv_l1_2 = cvx.norm(w_priv[:, 1], 1)

        w_priv_l1 = priv_l1_1 + priv_l1_2

        w_l1 = cvx.norm(w, 1)

        weight_regularization = 0.5 * (w_l1 + scaling_lupi_w * w_priv_l1)

        constraints = []

        loss = 0

        for left_bin in range(0, n_bins - 1):

            indices = np.where(y == get_original_bin_name[left_bin])

            constraints.append(

                X[indices] @ w - b_s[left_bin] <= -1 + priv_function(left_bin, 0)

            )

            constraints.append(priv_function(left_bin, 0) >= 0)

            loss += cvx.sum(priv_function(left_bin, 0))

        # Add constraints for slack into right neighboring bins

        for right_bin in range(1, n_bins):

            indices = np.where(y == get_original_bin_name[right_bin])

            constraints.append(

                X[indices] @ w - b_s[right_bin - 1] >= +1 - priv_function(right_bin, 1)

            )

            constraints.append(priv_function(right_bin, 1) >= 0)

            loss += cvx.sum(priv_function(right_bin, 1))

        for i_boundary in range(0, n_boundaries - 1):

            constraints.append(b_s[i_boundary] <= b_s[i_boundary + 1])

        objective = cvx.Minimize(C * loss + weight_regularization)

        # Solve problem.

        problem = cvx.Problem(objective, constraints)

        problem.solve(**self.SOLVER_PARAMS)

        w = w.value

        b_s = b_s.value

        self.model_state = {

            "w": w,

            "b_s": b_s,

            "w_priv": w_priv.value,

            "d_priv": d_priv.value,

            "lupi_features": lupi_features,  # Number of lupi features in the dataset TODO: Move this somewhere else

            "bin_boundaries": n_boundaries,

        }

        self.constraints = {

            "loss": loss.value,

            "w_l1": w_l1.value,

            "w_priv_l1": w_priv_l1.value,

        }

        return self
get_params
def get_params(
    self,
    deep=True
)

Get parameters for this estimator.

Parameters

deep : bool, default=True If True, will return the parameters for this estimator and contained subobjects that are estimators.

Returns

params : mapping of string to any Parameter names mapped to their values.

View Source
    def get_params(self, deep=True):

        """

        Get parameters for this estimator.

        Parameters

        ----------

        deep : bool, default=True

            If True, will return the parameters for this estimator and

            contained subobjects that are estimators.

        Returns

        -------

        params : mapping of string to any

            Parameter names mapped to their values.

        """

        out = dict()

        for key in self._get_param_names():

            try:

                value = getattr(self, key)

            except AttributeError:

                warnings.warn('From version 0.24, get_params will raise an '

                              'AttributeError if a parameter cannot be '

                              'retrieved as an instance attribute. Previously '

                              'it would return None.',

                              FutureWarning)

                value = None

            if deep and hasattr(value, 'get_params'):

                deep_items = value.get_params().items()

                out.update((key + '__' + k, val) for k, val in deep_items)

            out[key] = value

        return out
make_scorer
def make_scorer(
    self
)
View Source
    def make_scorer(self):

        # Use multiple scores for ordinal regression

        mze = make_scorer(ordinal_scores, error_type="mze")

        mae = make_scorer(ordinal_scores, error_type="mae")

        mmae = make_scorer(ordinal_scores, error_type="mmae")

        scorer = {"mze": mze, "mae": mae, "mmae": mmae}

        return scorer, "mmae"
predict
def predict(
    self,
    X
)
View Source
    def predict(self, X):

        X, X_priv = split_dataset(X, self.lupi_features)

        w = self.model_state["w"]

        b_s = self.model_state["b_s"]

        scores = np.dot(X, w.T)[np.newaxis]

        bin_thresholds = np.append(b_s, np.inf)

        # If thresholds are smaller than score the value belongs to the bigger bin

        # after subtracting we check for positive elements

        indices = np.sum(scores.T - bin_thresholds >= 0, -1)

        return self.classes_[indices]
score
def score(
    self,
    X,
    y,
    error_type='mmae',
    return_error=False,
    **kwargs
)
View Source
    def score(self, X, y, error_type="mmae", return_error=False, **kwargs):

        X, y = check_X_y(X, y)

        prediction = self.predict(X)

        score = ordinal_scores(y, prediction, error_type, return_error=return_error)

        return score
set_params
def set_params(
    self,
    **params
)

Set the parameters of this estimator.

The method works on simple estimators as well as on nested objects (such as pipelines). The latter have parameters of the form <component>__<parameter> so that it's possible to update each component of a nested object.

Parameters

**params : dict Estimator parameters.

Returns

self : object Estimator instance.

View Source
    def set_params(self, **params):

        """

        Set the parameters of this estimator.

        The method works on simple estimators as well as on nested objects

        (such as pipelines). The latter have parameters of the form

        ``<component>__<parameter>`` so that it's possible to update each

        component of a nested object.

        Parameters

        ----------

        **params : dict

            Estimator parameters.

        Returns

        -------

        self : object

            Estimator instance.

        """

        if not params:

            # Simple optimization to gain speed (inspect is slow)

            return self

        valid_params = self.get_params(deep=True)

        nested_params = defaultdict(dict)  # grouped by prefix

        for key, value in params.items():

            key, delim, sub_key = key.partition('__')

            if key not in valid_params:

                raise ValueError('Invalid parameter %s for estimator %s. '

                                 'Check the list of available parameters '

                                 'with `estimator.get_params().keys()`.' %

                                 (key, self))

            if delim:

                nested_params[key][sub_key] = value

            else:

                setattr(self, key, value)

                valid_params[key] = value

        for key, sub_params in nested_params.items():

            valid_params[key].set_params(**sub_params)

        return self