Skip to content

Module fri.model.base_cvxproblem

View Source
from abc import ABC, abstractmethod

import cvxpy as cvx

import numpy as np

from cvxpy import SolverError

class Relevance_CVXProblem(ABC):

    def __repr__(self) -> str:

        if self.isLowerBound:

            lower = "Lower"

        else:

            lower = "Upper"

        name = f"{lower}_{self.current_feature}_{self.__class__.__name__}"

        state = ""

        for s in self.init_hyperparameters.items():

            state += f"{s[0]}:{s[1]}, "

        for s in self.init_model_constraints.items():

            state += f"{s[0]}:{s[1]}, "

        state = "(" + state[:-2] + ")"

        if self.isProbe:

            prefix = f"Probe_{self.probeID}"

        else:

            prefix = ""

        return prefix + name + state

    def __init__(

        self,

        current_feature: int,

        data: tuple,

        hyperparameters,

        best_model_constraints,

        preset_model=None,

        best_model_state=None,

        probeID=-1,

        **kwargs,

    ) -> None:

        self._probeID = probeID

        self._feature_relevance = None

        self.isLowerBound = None

        # General data

        self.current_feature = current_feature

        self.preset_model = preset_model

        self.best_model_state = best_model_state

        self.preprocessing_data(data, best_model_state)

        # Initialize constraints

        self._constraints = []

        self._objective = None

        self.w = None

        self._init_constraints(hyperparameters, best_model_constraints)

        if self.preset_model is not None:

            self._add_preset_constraints(self.preset_model, best_model_constraints)

        self.init_hyperparameters = hyperparameters

        self.init_model_constraints = best_model_constraints

    def preprocessing_data(self, data, best_model_state):

        X, y = data

        self.n = X.shape[0]

        self.d = X.shape[1]

        self.X = X

        self.y = np.array(y)

    @property

    def constraints(self):

        return self._constraints

    def add_constraint(self, new):

        self._constraints.append(new)

    @property

    def objective(self):

        return self._objective

    @property

    def solved_relevance(self):

        if self.is_solved:

            return self.objective.value

        else:

            raise Exception("Problem not solved. No feature relevance computed.")

    @property

    def probeID(self):

        return self._probeID

    @property

    def isProbe(self):

        return self.probeID >= 0

    @abstractmethod

    def _init_constraints(self, parameters, init_model_constraints):

        pass

    @abstractmethod

    def init_objective_UB(self, **kwargs):

        pass

    @abstractmethod

    def init_objective_LB(self, **kwargs):

        pass

    @property

    def cvx_problem(self):

        return self._cvx_problem

    @property

    def is_solved(self):

        if self._solver_status in self.accepted_status:

            try:

                val = self.objective.value

            except ValueError:

                return False

            return True

        else:

            return False

    @property

    def accepted_status(self):

        return ["optimal", "optimal_inaccurate"]

    def solve(self) -> object:

        # We init cvx problem here because pickling LP solver objects is problematic

        # by deferring it to here, worker threads do the problem building themselves and we spare the serialization

        self._cvx_problem = cvx.Problem(

            objective=self.objective, constraints=self.constraints

        )

        try:

            # print("Solve", self)

            self._cvx_problem.solve(**self.solver_kwargs)

        except SolverError:

            # We ignore Solver Errors, which are common with our framework:

            # We solve multiple problems per bound and choose a feasible solution later (see '_create_interval')

            pass

        self._solver_status = self._cvx_problem.status

        # self._cvx_problem = None

        return self

    def _retrieve_result(self):

        return self.current_feature, self.objective

    @property

    def solver_kwargs(self):

        return {"verbose": False, "solver": "ECOS", "max_iters": 300}

    def _add_preset_constraints(self, preset_model: dict, best_model_constraints):

        for feature, current_preset in preset_model.items():

            # Skip current feature

            if feature == self.current_feature:

                continue

            # Skip unset values

            if all(np.isnan(current_preset)):

                continue

            # a weight bigger than the optimal model L1 makes no sense

            assert abs(current_preset[0]) <= best_model_constraints["w_l1"]

            assert abs(current_preset[1]) <= best_model_constraints["w_l1"]

            # We add a pair of constraints depending on sign of known coefficient

            # this makes it possible to solve this as a convex problem

            if current_preset[0] >= 0:

                self.add_constraint(self.w[feature] >= current_preset[0])

                self.add_constraint(self.w[feature] <= current_preset[1])

            else:

                self.add_constraint(self.w[feature] <= current_preset[0])

                self.add_constraint(self.w[feature] >= current_preset[1])

    @classmethod

    def generate_lower_bound_problem(

        cls,

        best_hyperparameters,

        init_constraints,

        best_model_state,

        data,

        di,

        preset_model,

        probeID=-1,

    ):

        problem = cls(

            di,

            data,

            best_hyperparameters,

            init_constraints,

            preset_model=preset_model,

            best_model_state=best_model_state,

            probeID=probeID,

        )

        problem.init_objective_LB()

        problem.isLowerBound = True

        yield problem

    @classmethod

    def generate_upper_bound_problem(

        cls,

        best_hyperparameters,

        init_constraints,

        best_model_state,

        data,

        di,

        preset_model,

        probeID=-1,

    ):

        for sign in [-1, 1]:

            problem = cls(

                di,

                data,

                best_hyperparameters,

                init_constraints,

                preset_model=preset_model,

                best_model_state=best_model_state,

                probeID=probeID,

            )

            problem.init_objective_UB(sign=sign)

            problem.isLowerBound = False

            yield problem

    @classmethod

    def aggregate_min_candidates(cls, min_problems_candidates):

        vals = [candidate.solved_relevance for candidate in min_problems_candidates]

        min_value = min(vals)

        return min_value

    @classmethod

    def aggregate_max_candidates(cls, max_problems_candidates):

        vals = [candidate.solved_relevance for candidate in max_problems_candidates]

        max_value = max(vals)

        return max_value

Classes

Relevance_CVXProblem

class Relevance_CVXProblem(
    current_feature: int,
    data: tuple,
    hyperparameters,
    best_model_constraints,
    preset_model=None,
    best_model_state=None,
    probeID=-1,
    **kwargs
)

Helper class that provides a standard way to create an ABC using inheritance.

View Source
class Relevance_CVXProblem(ABC):

    def __repr__(self) -> str:

        if self.isLowerBound:

            lower = "Lower"

        else:

            lower = "Upper"

        name = f"{lower}_{self.current_feature}_{self.__class__.__name__}"

        state = ""

        for s in self.init_hyperparameters.items():

            state += f"{s[0]}:{s[1]}, "

        for s in self.init_model_constraints.items():

            state += f"{s[0]}:{s[1]}, "

        state = "(" + state[:-2] + ")"

        if self.isProbe:

            prefix = f"Probe_{self.probeID}"

        else:

            prefix = ""

        return prefix + name + state

    def __init__(

        self,

        current_feature: int,

        data: tuple,

        hyperparameters,

        best_model_constraints,

        preset_model=None,

        best_model_state=None,

        probeID=-1,

        **kwargs,

    ) -> None:

        self._probeID = probeID

        self._feature_relevance = None

        self.isLowerBound = None

        # General data

        self.current_feature = current_feature

        self.preset_model = preset_model

        self.best_model_state = best_model_state

        self.preprocessing_data(data, best_model_state)

        # Initialize constraints

        self._constraints = []

        self._objective = None

        self.w = None

        self._init_constraints(hyperparameters, best_model_constraints)

        if self.preset_model is not None:

            self._add_preset_constraints(self.preset_model, best_model_constraints)

        self.init_hyperparameters = hyperparameters

        self.init_model_constraints = best_model_constraints

    def preprocessing_data(self, data, best_model_state):

        X, y = data

        self.n = X.shape[0]

        self.d = X.shape[1]

        self.X = X

        self.y = np.array(y)

    @property

    def constraints(self):

        return self._constraints

    def add_constraint(self, new):

        self._constraints.append(new)

    @property

    def objective(self):

        return self._objective

    @property

    def solved_relevance(self):

        if self.is_solved:

            return self.objective.value

        else:

            raise Exception("Problem not solved. No feature relevance computed.")

    @property

    def probeID(self):

        return self._probeID

    @property

    def isProbe(self):

        return self.probeID >= 0

    @abstractmethod

    def _init_constraints(self, parameters, init_model_constraints):

        pass

    @abstractmethod

    def init_objective_UB(self, **kwargs):

        pass

    @abstractmethod

    def init_objective_LB(self, **kwargs):

        pass

    @property

    def cvx_problem(self):

        return self._cvx_problem

    @property

    def is_solved(self):

        if self._solver_status in self.accepted_status:

            try:

                val = self.objective.value

            except ValueError:

                return False

            return True

        else:

            return False

    @property

    def accepted_status(self):

        return ["optimal", "optimal_inaccurate"]

    def solve(self) -> object:

        # We init cvx problem here because pickling LP solver objects is problematic

        # by deferring it to here, worker threads do the problem building themselves and we spare the serialization

        self._cvx_problem = cvx.Problem(

            objective=self.objective, constraints=self.constraints

        )

        try:

            # print("Solve", self)

            self._cvx_problem.solve(**self.solver_kwargs)

        except SolverError:

            # We ignore Solver Errors, which are common with our framework:

            # We solve multiple problems per bound and choose a feasible solution later (see '_create_interval')

            pass

        self._solver_status = self._cvx_problem.status

        # self._cvx_problem = None

        return self

    def _retrieve_result(self):

        return self.current_feature, self.objective

    @property

    def solver_kwargs(self):

        return {"verbose": False, "solver": "ECOS", "max_iters": 300}

    def _add_preset_constraints(self, preset_model: dict, best_model_constraints):

        for feature, current_preset in preset_model.items():

            # Skip current feature

            if feature == self.current_feature:

                continue

            # Skip unset values

            if all(np.isnan(current_preset)):

                continue

            # a weight bigger than the optimal model L1 makes no sense

            assert abs(current_preset[0]) <= best_model_constraints["w_l1"]

            assert abs(current_preset[1]) <= best_model_constraints["w_l1"]

            # We add a pair of constraints depending on sign of known coefficient

            # this makes it possible to solve this as a convex problem

            if current_preset[0] >= 0:

                self.add_constraint(self.w[feature] >= current_preset[0])

                self.add_constraint(self.w[feature] <= current_preset[1])

            else:

                self.add_constraint(self.w[feature] <= current_preset[0])

                self.add_constraint(self.w[feature] >= current_preset[1])

    @classmethod

    def generate_lower_bound_problem(

        cls,

        best_hyperparameters,

        init_constraints,

        best_model_state,

        data,

        di,

        preset_model,

        probeID=-1,

    ):

        problem = cls(

            di,

            data,

            best_hyperparameters,

            init_constraints,

            preset_model=preset_model,

            best_model_state=best_model_state,

            probeID=probeID,

        )

        problem.init_objective_LB()

        problem.isLowerBound = True

        yield problem

    @classmethod

    def generate_upper_bound_problem(

        cls,

        best_hyperparameters,

        init_constraints,

        best_model_state,

        data,

        di,

        preset_model,

        probeID=-1,

    ):

        for sign in [-1, 1]:

            problem = cls(

                di,

                data,

                best_hyperparameters,

                init_constraints,

                preset_model=preset_model,

                best_model_state=best_model_state,

                probeID=probeID,

            )

            problem.init_objective_UB(sign=sign)

            problem.isLowerBound = False

            yield problem

    @classmethod

    def aggregate_min_candidates(cls, min_problems_candidates):

        vals = [candidate.solved_relevance for candidate in min_problems_candidates]

        min_value = min(vals)

        return min_value

    @classmethod

    def aggregate_max_candidates(cls, max_problems_candidates):

        vals = [candidate.solved_relevance for candidate in max_problems_candidates]

        max_value = max(vals)

        return max_value

Ancestors (in MRO)

  • abc.ABC

Descendants

  • fri.model.classification.Classification_Relevance_Bound
  • fri.model.base_lupi.LUPI_Relevance_CVXProblem
  • fri.model.ordinal_regression.OrdinalRegression_Relevance_Bound
  • fri.model.regression.Regression_Relevance_Bound

Static methods

aggregate_max_candidates
def aggregate_max_candidates(
    max_problems_candidates
)
View Source
    @classmethod

    def aggregate_max_candidates(cls, max_problems_candidates):

        vals = [candidate.solved_relevance for candidate in max_problems_candidates]

        max_value = max(vals)

        return max_value
aggregate_min_candidates
def aggregate_min_candidates(
    min_problems_candidates
)
View Source
    @classmethod

    def aggregate_min_candidates(cls, min_problems_candidates):

        vals = [candidate.solved_relevance for candidate in min_problems_candidates]

        min_value = min(vals)

        return min_value
generate_lower_bound_problem
def generate_lower_bound_problem(
    best_hyperparameters,
    init_constraints,
    best_model_state,
    data,
    di,
    preset_model,
    probeID=-1
)
View Source
    @classmethod

    def generate_lower_bound_problem(

        cls,

        best_hyperparameters,

        init_constraints,

        best_model_state,

        data,

        di,

        preset_model,

        probeID=-1,

    ):

        problem = cls(

            di,

            data,

            best_hyperparameters,

            init_constraints,

            preset_model=preset_model,

            best_model_state=best_model_state,

            probeID=probeID,

        )

        problem.init_objective_LB()

        problem.isLowerBound = True

        yield problem
generate_upper_bound_problem
def generate_upper_bound_problem(
    best_hyperparameters,
    init_constraints,
    best_model_state,
    data,
    di,
    preset_model,
    probeID=-1
)
View Source
    @classmethod

    def generate_upper_bound_problem(

        cls,

        best_hyperparameters,

        init_constraints,

        best_model_state,

        data,

        di,

        preset_model,

        probeID=-1,

    ):

        for sign in [-1, 1]:

            problem = cls(

                di,

                data,

                best_hyperparameters,

                init_constraints,

                preset_model=preset_model,

                best_model_state=best_model_state,

                probeID=probeID,

            )

            problem.init_objective_UB(sign=sign)

            problem.isLowerBound = False

            yield problem

Instance variables

accepted_status
constraints
cvx_problem
isProbe
is_solved
objective
probeID
solved_relevance
solver_kwargs

Methods

add_constraint
def add_constraint(
    self,
    new
)
View Source
    def add_constraint(self, new):

        self._constraints.append(new)
init_objective_LB
def init_objective_LB(
    self,
    **kwargs
)
View Source
    @abstractmethod

    def init_objective_LB(self, **kwargs):

        pass
init_objective_UB
def init_objective_UB(
    self,
    **kwargs
)
View Source
    @abstractmethod

    def init_objective_UB(self, **kwargs):

        pass
preprocessing_data
def preprocessing_data(
    self,
    data,
    best_model_state
)
View Source
    def preprocessing_data(self, data, best_model_state):

        X, y = data

        self.n = X.shape[0]

        self.d = X.shape[1]

        self.X = X

        self.y = np.array(y)
solve
def solve(
    self
) -> object
View Source
    def solve(self) -> object:

        # We init cvx problem here because pickling LP solver objects is problematic

        # by deferring it to here, worker threads do the problem building themselves and we spare the serialization

        self._cvx_problem = cvx.Problem(

            objective=self.objective, constraints=self.constraints

        )

        try:

            # print("Solve", self)

            self._cvx_problem.solve(**self.solver_kwargs)

        except SolverError:

            # We ignore Solver Errors, which are common with our framework:

            # We solve multiple problems per bound and choose a feasible solution later (see '_create_interval')

            pass

        self._solver_status = self._cvx_problem.status

        # self._cvx_problem = None

        return self