Skip to content

Module fri.compute

This module includes all important computation functions which are used internally. They (normally) should not be used by users.

View Source
"""This module includes all important computation functions which are used internally.

They (normally) should not be used by users.

"""

import logging

from collections import defaultdict

import attr

import joblib

import numpy as np

import scipy

from scipy import stats

from scipy.cluster.hierarchy import fcluster, linkage

from fri.model.base_cvxproblem import Relevance_CVXProblem

from fri.model.base_initmodel import InitModel

from fri.model.base_type import ProblemType

from fri.utils import permutate_feature_in_data

from .utils import distance

MIN_N_PROBE_FEATURES = 20  # Lower bound of probe features

def _start_solver_worker(bound: Relevance_CVXProblem):

    """

    Worker thread method for parallel computation

    """

    return bound.solve()

class RelevanceBoundsIntervals(object):

    def __init__(

        self,

        data,

        problem_type: ProblemType,

        best_init_model: InitModel,

        random_state,

        n_resampling,

        n_jobs,

        verbose,

        normalize=True,

    ):

        self.data = data

        self.problem_type = problem_type

        self.verbose = verbose

        self.n_jobs = n_jobs

        self.n_resampling = n_resampling

        self.random_state = random_state

        self.best_init_model = best_init_model

        self.best_hyperparameters = best_init_model.get_params()

        self.normalize = normalize

        # Relax constraints to improve stability

        self.init_constraints = problem_type.get_relaxed_constraints(

            best_init_model.constraints

        )

    def get_normalized_lupi_intervals(self, lupi_features, presetModel=None):

        # We define a list of all the features we want to compute relevance bounds for

        X, _ = self.data  # TODO: handle other data formats

        all_d = X.shape[1]

        normal_d = all_d - lupi_features

        # Compute relevance bounds and probes for normal features and LUPI

        with joblib.Parallel(n_jobs=self.n_jobs, verbose=self.verbose) as parallel:

            d_n = _get_necessary_dimensions(normal_d, presetModel)

            rb = self.compute_relevance_bounds(d_n, parallel=parallel)

            probe_upper = self.compute_probe_values(d_n, True, parallel=parallel)

            probe_lower = self.compute_probe_values(d_n, False, parallel=parallel)

            d_l = _get_necessary_dimensions(all_d, presetModel, start=normal_d)

            rb_l = self.compute_relevance_bounds(d_l, parallel=parallel)

            probe_priv_upper = self.compute_probe_values(d_l, True, parallel=parallel)

            probe_priv_lower = self.compute_probe_values(d_l, False, parallel=parallel)

        #

        # Postprocess

        #

        # Get Scaling Parameters

        l1 = self.init_constraints["w_l1"]

        l1_priv = self.init_constraints["w_priv_l1"]

        l1 = l1 + l1_priv

        # Normalize Normal and Lupi features

        rb_norm = self._postprocessing(l1, rb)

        rb_l_norm = self._postprocessing(l1, rb_l)

        interval_ = np.concatenate([rb_norm, rb_l_norm])

        # Normalize Probes

        probe_lower = self._postprocessing(l1, probe_lower)

        probe_upper = self._postprocessing(l1, probe_upper)

        probe_priv_lower = self._postprocessing(l1, probe_priv_lower)

        probe_priv_upper = self._postprocessing(l1, probe_priv_upper)

        #

        #

        # Classify features

        self.f_classifier = FeatureClassifier(

            probe_lower, probe_upper, verbose=self.verbose

        )

        feature_classes = self.f_classifier.classify(rb_norm)

        self.f_classifier_lupi = FeatureClassifier(

            probe_priv_lower, probe_priv_upper, verbose=self.verbose

        )

        feature_classes_lupi = self.f_classifier_lupi.classify(rb_l_norm)

        fc_both = np.concatenate([feature_classes, feature_classes_lupi])

        return interval_, fc_both

    def get_normalized_intervals(self, presetModel=None):

        # We define a list of all the features we want to compute relevance bounds for

        X, _ = self.data

        d = X.shape[1]

        # Depending on the preset model, we dont need to compute all bounds

        # e.g. in the case of fixed features we skip those

        dims = _get_necessary_dimensions(d, presetModel)

        with joblib.Parallel(n_jobs=self.n_jobs, verbose=self.verbose) as parallel:

            relevance_bounds = self.compute_relevance_bounds(

                dims, parallel=parallel, presetModel=presetModel

            )

            probe_values_upper = self.compute_probe_values(

                dims, isUpper=True, parallel=parallel, presetModel=presetModel

            )

            probe_values_lower = self.compute_probe_values(

                dims, isUpper=False, parallel=parallel, presetModel=presetModel

            )

        # Postprocess bounds

        norm_bounds = self._postprocessing(

            self.best_init_model.L1_factor, relevance_bounds

        )

        norm_probe_values_upper = self._postprocessing(

            self.best_init_model.L1_factor, probe_values_upper

        )

        norm_probe_values_lower = self._postprocessing(

            self.best_init_model.L1_factor, probe_values_lower

        )

        self.f_classifier = FeatureClassifier(

            norm_probe_values_lower, norm_probe_values_upper, verbose=self.verbose

        )

        feature_classes = self.f_classifier.classify(norm_bounds)

        return norm_bounds, feature_classes

    def compute_relevance_bounds(

        self, dims, parallel=None, presetModel=None, solverargs=None

    ):

        init_model_state = self.best_init_model.model_state

        work_queue = self._generate_relevance_bounds_tasks(

            dims, self.data, presetModel, init_model_state

        )

        # Solve relevance bounds in parallel (when available)

        if parallel is None:

            parallel = joblib.Parallel(n_jobs=self.n_jobs, verbose=self.verbose)

        bound_results = parallel(map(joblib.delayed(_start_solver_worker), work_queue))

        # Retrieve results and aggregate values in dict

        solved_bounds = defaultdict(list)

        for finished_bound in bound_results:

            # Only add bounds with feasible solutions

            if finished_bound.is_solved:

                solved_bounds[finished_bound.current_feature].append(finished_bound)

        # Initalize array for pair of bounds(= intervals)

        length = len(dims)

        intervals = np.zeros((length, 2))

        for abs_index, rel_index in zip(dims, range(length)):

            # Return interval for feature i (can be a fixed value when set beforehand)

            interval_i = self._create_interval(abs_index, solved_bounds, presetModel)

            intervals[rel_index] = interval_i

        return intervals

    def compute_probe_values(self, dims, isUpper=True, parallel=None, presetModel=None):

        # Get model parameters

        init_model_state = self.best_init_model.model_state

        # Prepare parallel framework

        if parallel is None:

            parallel = joblib.Parallel(n_jobs=self.n_jobs, verbose=self.verbose)

        # Generate

        probe_queue = self._generate_probe_value_tasks(

            self.data,

            dims,

            isUpper,

            self.n_resampling,

            self.random_state,

            presetModel,

            init_model_state,

        )

        # Compute solution

        probe_results = parallel(map(joblib.delayed(_start_solver_worker), probe_queue))

        # probe_values.extend([probe.objective.value for probe in probe_results if probe.is_solved])

        candidates = defaultdict(list)

        for candidate in probe_results:

            # Only add bounds with feasible solutions

            if candidate.is_solved:

                candidates[candidate.probeID].append(candidate)

        probe_values = []

        for probes_for_ID in candidates.values():

            if isUpper:

                probe_values.append(

                    self.problem_type.get_cvxproblem_template.aggregate_max_candidates(

                        probes_for_ID

                    )

                )

            else:

                probe_values.append(

                    self.problem_type.get_cvxproblem_template.aggregate_min_candidates(

                        probes_for_ID

                    )

                )

        return np.array(probe_values)

    def _generate_relevance_bounds_tasks(

        self, dims, data, preset_model=None, best_model_state=None

    ):

        # Do not compute bounds for fixed features

        if preset_model is not None:

            dims = [di for di in dims if di not in preset_model]

        # Instantiate objects for computation later

        for di in dims:

            # Add Lower Bound problem(s) to work list

            yield from self.problem_type.get_cvxproblem_template.generate_lower_bound_problem(

                self.best_hyperparameters,

                self.init_constraints,

                best_model_state,

                data,

                di,

                preset_model,

            )

            # Add problem(s) for Upper bound

            yield from self.problem_type.get_cvxproblem_template.generate_upper_bound_problem(

                self.best_hyperparameters,

                self.init_constraints,

                best_model_state,

                data,

                di,

                preset_model,

            )

    def _generate_probe_value_tasks(

        self,

        data,

        dims,

        isUpper,

        n_resampling,

        random_state,

        preset_model=None,

        best_model_state=None,

    ):

        if isUpper:

            factory = (

                self.problem_type.get_cvxproblem_template.generate_upper_bound_problem

            )

        else:

            factory = (

                self.problem_type.get_cvxproblem_template.generate_lower_bound_problem

            )

        # Random sample n_resampling shadow features by permuting real features and computing upper bound

        random_choice = random_state.choice(a=dims, size=n_resampling)

        # Instantiate objects

        for i, di in enumerate(random_choice):

            data_perm = permutate_feature_in_data(data, di, random_state)

            # We only use upper bounds as probe features

            yield from factory(

                self.best_hyperparameters,

                self.init_constraints,

                best_model_state,

                data_perm,

                di,

                preset_model,

                probeID=i,

            )

    def _create_interval(

        self, feature: int, solved_bounds: dict, presetModel: dict = None

    ):

        # Return preset values for fixed features

        if presetModel is not None:

            if feature in presetModel:

                return presetModel[feature].squeeze()

        all_bounds = solved_bounds[feature]

        min_problems_candidates = [p for p in all_bounds if p.isLowerBound]

        max_problems_candidates = [p for p in all_bounds if not p.isLowerBound]

        if len(all_bounds) < 2:

            logging.error(

                f"(Some) relevance bounds for feature {feature} were not solved."

            )

            raise Exception("Infeasible bound(s).")

        lower_bound = (

            self.problem_type.get_cvxproblem_template.aggregate_min_candidates(

                min_problems_candidates

            )

        )

        upper_bound = (

            self.problem_type.get_cvxproblem_template.aggregate_max_candidates(

                max_problems_candidates

            )

        )

        return lower_bound, upper_bound

    def compute_single_preset_relevance_bounds(

        self, i: int, signed_preset_i: [float, float]

    ):

        """

        Method to run method once for one restricted feature

        Parameters

        ----------

        i:

            restricted feature

        signed_preset_i:

            restricted range of feature i (set before optimization = preset)

        """

        preset = {i: signed_preset_i}

        rangevector = self.compute_multi_preset_relevance_bounds(preset)

        return rangevector

    def compute_multi_preset_relevance_bounds(self, preset, lupi_features=0):

        """

        Method to run method with preset values

        Parameters

        ----------

        lupi_features

        """

        # The user is working with normalized values while we compute them unscaled

        if self.normalize:

            normalized = {}

            for k, v in preset.items():

                normalized[k] = np.asarray(v) * self.best_init_model.L1_factor

            preset = normalized

        # Add sign to presets

        preset = self._add_sign_to_preset(preset)

        # Calculate all bounds with feature i set to min_i

        if lupi_features > 0:

            rangevector, _ = self.get_normalized_lupi_intervals(

                lupi_features, presetModel=preset

            )

        else:

            rangevector, _ = self.get_normalized_intervals(presetModel=preset)

        return rangevector

    def _add_sign_to_preset(self, unsigned_presets):

        """

        We need signed presets for our convex problem definition later.

        We reuse the coefficients of the optimal model for this

        Parameters

        ----------

        unsigned_presets : dict

        Returns

        -------

        dict

        """

        signed_presets = {}

        # Obtain optimal model parameters

        w = self.best_init_model.model_state["w"]

        preset_sum = 0

        for i, preset in unsigned_presets.items():

            preset = np.array(preset)

            if preset.size == 1:

                preset = np.repeat(preset, 2)

            unsigned_preset_i = np.sign(w[i]) * preset

            # accumulate maximal feature  contribution

            preset_sum += unsigned_preset_i[1]  # Take upper preset

            signed_presets[i] = unsigned_preset_i

        # Check if unsigned_presets makes sense

        l1 = self.init_constraints["w_l1"]

        if preset_sum > l1:

            print("maximum L1 norm of presets: ", preset_sum)

            print("L1 allowed:", l1)

            print("Presets are not feasible. Try lowering values.")

            return

        return signed_presets

    def _postprocessing(self, L1, rangevector, round_to_zero=True):

        if self.normalize:

            assert L1 > 0

            rangevector = rangevector.copy() / L1

        if round_to_zero:

            rangevector[rangevector <= 1e-11] = 0

        return rangevector

    def grouping(self, interval, cutoff_threshold=0.55, method="single"):

        """Find feature clusters based on observed variance when changing feature contributions

        Parameters

        ----------

        cutoff_threshold : float, optional

            Cutoff value for the flat clustering step; decides at which height in the dendrogram the cut is made to determine groups.

        method : str, optional

            Linkage method used in the hierarchical clustering.

        Returns

        -------

        self

        """

        # Do we have intervals?

        if self.best_init_model is None:

            raise Exception("Model needs to be fitted already.")

        d = len(interval)

        # Init arrays

        interval_constrained_to_min = np.zeros(

            (d, d, 2)

        )  # Save ranges (d,2-dim) for every contrained run (d-times)

        absolute_delta_bounds_summed_min = np.zeros((d, d, 2))

        interval_constrained_to_max = np.zeros(

            (d, d, 2)

        )  # Save ranges (d,2-dim) for every contrained run (d-times)

        absolute_delta_bounds_summed_max = np.zeros((d, d, 2))

        # Set weight for each dimension to minimum and maximum possible value and run optimization of all others

        # We retrieve the relevance bounds and calculate the absolute difference between them and non-constrained bounds

        for i in range(d):

            # min

            lowb = interval[i, 0]

            ranges = self.compute_single_preset_relevance_bounds(i, [lowb, lowb])

            diff = interval - ranges

            diff[i] = 0

            interval_constrained_to_min[i] = ranges

            absolute_delta_bounds_summed_min[i] = diff

            # max

            highb = interval[i, 1]

            ranges = self.compute_single_preset_relevance_bounds(i, [highb, highb])

            diff = interval - ranges

            diff[i] = 0

            interval_constrained_to_max[i] = ranges

            absolute_delta_bounds_summed_max[i] = diff

        feature_points = np.zeros((d, 2 * d * 2))

        for i in range(d):

            feature_points[i, : (2 * d)] = absolute_delta_bounds_summed_min[i].flatten()

            feature_points[i, (2 * d) :] = absolute_delta_bounds_summed_max[i].flatten()

        self.relevance_variance = feature_points

        # Calculate similarity using custom measure

        dist_mat = scipy.spatial.distance.pdist(feature_points, metric=distance)

        # Create linkage tree

        link = linkage(dist_mat, method=method, optimal_ordering=True)

        # Set cutoff at which threshold the linkage gets flattened (clustering)

        RATIO = cutoff_threshold

        threshold = RATIO * np.max(link[:, 2])  # max of branch lengths (distances)

        feature_clustering = fcluster(link, threshold, criterion="distance")

        self.feature_clusters_, self.linkage_ = feature_clustering, link

        return self.feature_clusters_, self.linkage_

def _get_necessary_dimensions(d: int, presetModel: dict = None, start=0):

    dims = np.arange(start, d)

    # if presetModel is not None:

    #    # Exclude fixed (preset) dimensions from being redundantly computed

    #    dims = [di for di in dims if di not in presetModel.keys()]

    return dims

class FeatureClassifier:

    def __init__(self, probes_low, probes_up, fpr=1e-4, verbose=0):

        self.lower_stat = create_probe_statistic(probes_low, fpr, verbose=verbose)

        self.upper_stat = create_probe_statistic(probes_up, fpr, verbose=verbose)

        if verbose > 0:

            logging.info("**** Feature Selection ****")

            logging.info("Lower Probe Statistic")

            logging.info(self.lower_stat)

            logging.info("Upper Probe Statistic")

            logging.info(self.upper_stat)

    def classify(self, relevance_bounds):

        """

        Parameters

        ----------

        relevance_bounds : numpy.ndarray

            two dimensional array with relevance bounds

            first column coresponds to minrel and second to maxrel

        """

        weakly = relevance_bounds[:, 1] > self.upper_stat.upper_threshold

        strongly = relevance_bounds[:, 0] > self.lower_stat.upper_threshold

        both = np.logical_and(weakly, strongly)

        prediction = np.zeros(relevance_bounds.shape[0], dtype=np.int)

        prediction[weakly] = 1

        prediction[both] = 2

        return prediction

@attr.s

class ProbeStatistic:

    """

    Collects the threshold values about the statistics

    from one kind of relevance bounds (minrel or maxrel).

    """

    lower_threshold = attr.ib(type=float)

    upper_threshold = attr.ib(type=float)

    n_probes = attr.ib(type=int)

def create_probe_statistic(probe_values, fpr, verbose=0):

    # Create prediction interval statistics based on randomly permutated probe features (based on real features)

    n = len(probe_values)

    if n == 0:

        if verbose > 0:

            logging.info(

                "All probes were infeasible. All features considered relevant."

            )

        #    # If all probes were infeasible we expect an empty list

        #    # If they are infeasible it also means that only strongly relevant features were in the data

        #    # As such we just set the prediction without considering the statistics

        low_t = 0

        up_t = 0

    elif n == 1:

        val = probe_values[0]

        low_t = val

        up_t = val

    else:

        probe_values = np.asarray(probe_values)

        mean = probe_values.mean()

        s = probe_values.std()

        low_t = mean + stats.t(df=n - 1).ppf(fpr) * s * np.sqrt(1 + (1 / n))

        up_t = mean - stats.t(df=n - 1).ppf(fpr) * s * np.sqrt(1 + (1 / n))

    return ProbeStatistic(low_t, up_t, n)

Variables

MIN_N_PROBE_FEATURES

Functions

create_probe_statistic

def create_probe_statistic(
    probe_values,
    fpr,
    verbose=0
)
View Source
def create_probe_statistic(probe_values, fpr, verbose=0):

    # Create prediction interval statistics based on randomly permutated probe features (based on real features)

    n = len(probe_values)

    if n == 0:

        if verbose > 0:

            logging.info(

                "All probes were infeasible. All features considered relevant."

            )

        #    # If all probes were infeasible we expect an empty list

        #    # If they are infeasible it also means that only strongly relevant features were in the data

        #    # As such we just set the prediction without considering the statistics

        low_t = 0

        up_t = 0

    elif n == 1:

        val = probe_values[0]

        low_t = val

        up_t = val

    else:

        probe_values = np.asarray(probe_values)

        mean = probe_values.mean()

        s = probe_values.std()

        low_t = mean + stats.t(df=n - 1).ppf(fpr) * s * np.sqrt(1 + (1 / n))

        up_t = mean - stats.t(df=n - 1).ppf(fpr) * s * np.sqrt(1 + (1 / n))

    return ProbeStatistic(low_t, up_t, n)

Classes

FeatureClassifier

class FeatureClassifier(
    probes_low,
    probes_up,
    fpr=0.0001,
    verbose=0
)
View Source
class FeatureClassifier:

    def __init__(self, probes_low, probes_up, fpr=1e-4, verbose=0):

        self.lower_stat = create_probe_statistic(probes_low, fpr, verbose=verbose)

        self.upper_stat = create_probe_statistic(probes_up, fpr, verbose=verbose)

        if verbose > 0:

            logging.info("**** Feature Selection ****")

            logging.info("Lower Probe Statistic")

            logging.info(self.lower_stat)

            logging.info("Upper Probe Statistic")

            logging.info(self.upper_stat)

    def classify(self, relevance_bounds):

        """

        Parameters

        ----------

        relevance_bounds : numpy.ndarray

            two dimensional array with relevance bounds

            first column coresponds to minrel and second to maxrel

        """

        weakly = relevance_bounds[:, 1] > self.upper_stat.upper_threshold

        strongly = relevance_bounds[:, 0] > self.lower_stat.upper_threshold

        both = np.logical_and(weakly, strongly)

        prediction = np.zeros(relevance_bounds.shape[0], dtype=np.int)

        prediction[weakly] = 1

        prediction[both] = 2

        return prediction

Methods

classify
def classify(
    self,
    relevance_bounds
)

Parameters

relevance_bounds : numpy.ndarray two dimensional array with relevance bounds first column coresponds to minrel and second to maxrel

View Source
    def classify(self, relevance_bounds):

        """

        Parameters

        ----------

        relevance_bounds : numpy.ndarray

            two dimensional array with relevance bounds

            first column coresponds to minrel and second to maxrel

        """

        weakly = relevance_bounds[:, 1] > self.upper_stat.upper_threshold

        strongly = relevance_bounds[:, 0] > self.lower_stat.upper_threshold

        both = np.logical_and(weakly, strongly)

        prediction = np.zeros(relevance_bounds.shape[0], dtype=np.int)

        prediction[weakly] = 1

        prediction[both] = 2

        return prediction

ProbeStatistic

class ProbeStatistic(
    lower_threshold: float,
    upper_threshold: float,
    n_probes: int
)

Collects the threshold values about the statistics from one kind of relevance bounds (minrel or maxrel).

View Source
class ProbeStatistic:

    """

    Collects the threshold values about the statistics

    from one kind of relevance bounds (minrel or maxrel).

    """

    lower_threshold = attr.ib(type=float)

    upper_threshold = attr.ib(type=float)

    n_probes = attr.ib(type=int)

Class variables

lower_threshold
n_probes
upper_threshold

RelevanceBoundsIntervals

class RelevanceBoundsIntervals(
    data,
    problem_type: fri.model.base_type.ProblemType,
    best_init_model: fri.model.base_initmodel.InitModel,
    random_state,
    n_resampling,
    n_jobs,
    verbose,
    normalize=True
)
View Source
class RelevanceBoundsIntervals(object):

    def __init__(

        self,

        data,

        problem_type: ProblemType,

        best_init_model: InitModel,

        random_state,

        n_resampling,

        n_jobs,

        verbose,

        normalize=True,

    ):

        self.data = data

        self.problem_type = problem_type

        self.verbose = verbose

        self.n_jobs = n_jobs

        self.n_resampling = n_resampling

        self.random_state = random_state

        self.best_init_model = best_init_model

        self.best_hyperparameters = best_init_model.get_params()

        self.normalize = normalize

        # Relax constraints to improve stability

        self.init_constraints = problem_type.get_relaxed_constraints(

            best_init_model.constraints

        )

    def get_normalized_lupi_intervals(self, lupi_features, presetModel=None):

        # We define a list of all the features we want to compute relevance bounds for

        X, _ = self.data  # TODO: handle other data formats

        all_d = X.shape[1]

        normal_d = all_d - lupi_features

        # Compute relevance bounds and probes for normal features and LUPI

        with joblib.Parallel(n_jobs=self.n_jobs, verbose=self.verbose) as parallel:

            d_n = _get_necessary_dimensions(normal_d, presetModel)

            rb = self.compute_relevance_bounds(d_n, parallel=parallel)

            probe_upper = self.compute_probe_values(d_n, True, parallel=parallel)

            probe_lower = self.compute_probe_values(d_n, False, parallel=parallel)

            d_l = _get_necessary_dimensions(all_d, presetModel, start=normal_d)

            rb_l = self.compute_relevance_bounds(d_l, parallel=parallel)

            probe_priv_upper = self.compute_probe_values(d_l, True, parallel=parallel)

            probe_priv_lower = self.compute_probe_values(d_l, False, parallel=parallel)

        #

        # Postprocess

        #

        # Get Scaling Parameters

        l1 = self.init_constraints["w_l1"]

        l1_priv = self.init_constraints["w_priv_l1"]

        l1 = l1 + l1_priv

        # Normalize Normal and Lupi features

        rb_norm = self._postprocessing(l1, rb)

        rb_l_norm = self._postprocessing(l1, rb_l)

        interval_ = np.concatenate([rb_norm, rb_l_norm])

        # Normalize Probes

        probe_lower = self._postprocessing(l1, probe_lower)

        probe_upper = self._postprocessing(l1, probe_upper)

        probe_priv_lower = self._postprocessing(l1, probe_priv_lower)

        probe_priv_upper = self._postprocessing(l1, probe_priv_upper)

        #

        #

        # Classify features

        self.f_classifier = FeatureClassifier(

            probe_lower, probe_upper, verbose=self.verbose

        )

        feature_classes = self.f_classifier.classify(rb_norm)

        self.f_classifier_lupi = FeatureClassifier(

            probe_priv_lower, probe_priv_upper, verbose=self.verbose

        )

        feature_classes_lupi = self.f_classifier_lupi.classify(rb_l_norm)

        fc_both = np.concatenate([feature_classes, feature_classes_lupi])

        return interval_, fc_both

    def get_normalized_intervals(self, presetModel=None):

        # We define a list of all the features we want to compute relevance bounds for

        X, _ = self.data

        d = X.shape[1]

        # Depending on the preset model, we dont need to compute all bounds

        # e.g. in the case of fixed features we skip those

        dims = _get_necessary_dimensions(d, presetModel)

        with joblib.Parallel(n_jobs=self.n_jobs, verbose=self.verbose) as parallel:

            relevance_bounds = self.compute_relevance_bounds(

                dims, parallel=parallel, presetModel=presetModel

            )

            probe_values_upper = self.compute_probe_values(

                dims, isUpper=True, parallel=parallel, presetModel=presetModel

            )

            probe_values_lower = self.compute_probe_values(

                dims, isUpper=False, parallel=parallel, presetModel=presetModel

            )

        # Postprocess bounds

        norm_bounds = self._postprocessing(

            self.best_init_model.L1_factor, relevance_bounds

        )

        norm_probe_values_upper = self._postprocessing(

            self.best_init_model.L1_factor, probe_values_upper

        )

        norm_probe_values_lower = self._postprocessing(

            self.best_init_model.L1_factor, probe_values_lower

        )

        self.f_classifier = FeatureClassifier(

            norm_probe_values_lower, norm_probe_values_upper, verbose=self.verbose

        )

        feature_classes = self.f_classifier.classify(norm_bounds)

        return norm_bounds, feature_classes

    def compute_relevance_bounds(

        self, dims, parallel=None, presetModel=None, solverargs=None

    ):

        init_model_state = self.best_init_model.model_state

        work_queue = self._generate_relevance_bounds_tasks(

            dims, self.data, presetModel, init_model_state

        )

        # Solve relevance bounds in parallel (when available)

        if parallel is None:

            parallel = joblib.Parallel(n_jobs=self.n_jobs, verbose=self.verbose)

        bound_results = parallel(map(joblib.delayed(_start_solver_worker), work_queue))

        # Retrieve results and aggregate values in dict

        solved_bounds = defaultdict(list)

        for finished_bound in bound_results:

            # Only add bounds with feasible solutions

            if finished_bound.is_solved:

                solved_bounds[finished_bound.current_feature].append(finished_bound)

        # Initalize array for pair of bounds(= intervals)

        length = len(dims)

        intervals = np.zeros((length, 2))

        for abs_index, rel_index in zip(dims, range(length)):

            # Return interval for feature i (can be a fixed value when set beforehand)

            interval_i = self._create_interval(abs_index, solved_bounds, presetModel)

            intervals[rel_index] = interval_i

        return intervals

    def compute_probe_values(self, dims, isUpper=True, parallel=None, presetModel=None):

        # Get model parameters

        init_model_state = self.best_init_model.model_state

        # Prepare parallel framework

        if parallel is None:

            parallel = joblib.Parallel(n_jobs=self.n_jobs, verbose=self.verbose)

        # Generate

        probe_queue = self._generate_probe_value_tasks(

            self.data,

            dims,

            isUpper,

            self.n_resampling,

            self.random_state,

            presetModel,

            init_model_state,

        )

        # Compute solution

        probe_results = parallel(map(joblib.delayed(_start_solver_worker), probe_queue))

        # probe_values.extend([probe.objective.value for probe in probe_results if probe.is_solved])

        candidates = defaultdict(list)

        for candidate in probe_results:

            # Only add bounds with feasible solutions

            if candidate.is_solved:

                candidates[candidate.probeID].append(candidate)

        probe_values = []

        for probes_for_ID in candidates.values():

            if isUpper:

                probe_values.append(

                    self.problem_type.get_cvxproblem_template.aggregate_max_candidates(

                        probes_for_ID

                    )

                )

            else:

                probe_values.append(

                    self.problem_type.get_cvxproblem_template.aggregate_min_candidates(

                        probes_for_ID

                    )

                )

        return np.array(probe_values)

    def _generate_relevance_bounds_tasks(

        self, dims, data, preset_model=None, best_model_state=None

    ):

        # Do not compute bounds for fixed features

        if preset_model is not None:

            dims = [di for di in dims if di not in preset_model]

        # Instantiate objects for computation later

        for di in dims:

            # Add Lower Bound problem(s) to work list

            yield from self.problem_type.get_cvxproblem_template.generate_lower_bound_problem(

                self.best_hyperparameters,

                self.init_constraints,

                best_model_state,

                data,

                di,

                preset_model,

            )

            # Add problem(s) for Upper bound

            yield from self.problem_type.get_cvxproblem_template.generate_upper_bound_problem(

                self.best_hyperparameters,

                self.init_constraints,

                best_model_state,

                data,

                di,

                preset_model,

            )

    def _generate_probe_value_tasks(

        self,

        data,

        dims,

        isUpper,

        n_resampling,

        random_state,

        preset_model=None,

        best_model_state=None,

    ):

        if isUpper:

            factory = (

                self.problem_type.get_cvxproblem_template.generate_upper_bound_problem

            )

        else:

            factory = (

                self.problem_type.get_cvxproblem_template.generate_lower_bound_problem

            )

        # Random sample n_resampling shadow features by permuting real features and computing upper bound

        random_choice = random_state.choice(a=dims, size=n_resampling)

        # Instantiate objects

        for i, di in enumerate(random_choice):

            data_perm = permutate_feature_in_data(data, di, random_state)

            # We only use upper bounds as probe features

            yield from factory(

                self.best_hyperparameters,

                self.init_constraints,

                best_model_state,

                data_perm,

                di,

                preset_model,

                probeID=i,

            )

    def _create_interval(

        self, feature: int, solved_bounds: dict, presetModel: dict = None

    ):

        # Return preset values for fixed features

        if presetModel is not None:

            if feature in presetModel:

                return presetModel[feature].squeeze()

        all_bounds = solved_bounds[feature]

        min_problems_candidates = [p for p in all_bounds if p.isLowerBound]

        max_problems_candidates = [p for p in all_bounds if not p.isLowerBound]

        if len(all_bounds) < 2:

            logging.error(

                f"(Some) relevance bounds for feature {feature} were not solved."

            )

            raise Exception("Infeasible bound(s).")

        lower_bound = (

            self.problem_type.get_cvxproblem_template.aggregate_min_candidates(

                min_problems_candidates

            )

        )

        upper_bound = (

            self.problem_type.get_cvxproblem_template.aggregate_max_candidates(

                max_problems_candidates

            )

        )

        return lower_bound, upper_bound

    def compute_single_preset_relevance_bounds(

        self, i: int, signed_preset_i: [float, float]

    ):

        """

        Method to run method once for one restricted feature

        Parameters

        ----------

        i:

            restricted feature

        signed_preset_i:

            restricted range of feature i (set before optimization = preset)

        """

        preset = {i: signed_preset_i}

        rangevector = self.compute_multi_preset_relevance_bounds(preset)

        return rangevector

    def compute_multi_preset_relevance_bounds(self, preset, lupi_features=0):

        """

        Method to run method with preset values

        Parameters

        ----------

        lupi_features

        """

        # The user is working with normalized values while we compute them unscaled

        if self.normalize:

            normalized = {}

            for k, v in preset.items():

                normalized[k] = np.asarray(v) * self.best_init_model.L1_factor

            preset = normalized

        # Add sign to presets

        preset = self._add_sign_to_preset(preset)

        # Calculate all bounds with feature i set to min_i

        if lupi_features > 0:

            rangevector, _ = self.get_normalized_lupi_intervals(

                lupi_features, presetModel=preset

            )

        else:

            rangevector, _ = self.get_normalized_intervals(presetModel=preset)

        return rangevector

    def _add_sign_to_preset(self, unsigned_presets):

        """

        We need signed presets for our convex problem definition later.

        We reuse the coefficients of the optimal model for this

        Parameters

        ----------

        unsigned_presets : dict

        Returns

        -------

        dict

        """

        signed_presets = {}

        # Obtain optimal model parameters

        w = self.best_init_model.model_state["w"]

        preset_sum = 0

        for i, preset in unsigned_presets.items():

            preset = np.array(preset)

            if preset.size == 1:

                preset = np.repeat(preset, 2)

            unsigned_preset_i = np.sign(w[i]) * preset

            # accumulate maximal feature  contribution

            preset_sum += unsigned_preset_i[1]  # Take upper preset

            signed_presets[i] = unsigned_preset_i

        # Check if unsigned_presets makes sense

        l1 = self.init_constraints["w_l1"]

        if preset_sum > l1:

            print("maximum L1 norm of presets: ", preset_sum)

            print("L1 allowed:", l1)

            print("Presets are not feasible. Try lowering values.")

            return

        return signed_presets

    def _postprocessing(self, L1, rangevector, round_to_zero=True):

        if self.normalize:

            assert L1 > 0

            rangevector = rangevector.copy() / L1

        if round_to_zero:

            rangevector[rangevector <= 1e-11] = 0

        return rangevector

    def grouping(self, interval, cutoff_threshold=0.55, method="single"):

        """Find feature clusters based on observed variance when changing feature contributions

        Parameters

        ----------

        cutoff_threshold : float, optional

            Cutoff value for the flat clustering step; decides at which height in the dendrogram the cut is made to determine groups.

        method : str, optional

            Linkage method used in the hierarchical clustering.

        Returns

        -------

        self

        """

        # Do we have intervals?

        if self.best_init_model is None:

            raise Exception("Model needs to be fitted already.")

        d = len(interval)

        # Init arrays

        interval_constrained_to_min = np.zeros(

            (d, d, 2)

        )  # Save ranges (d,2-dim) for every contrained run (d-times)

        absolute_delta_bounds_summed_min = np.zeros((d, d, 2))

        interval_constrained_to_max = np.zeros(

            (d, d, 2)

        )  # Save ranges (d,2-dim) for every contrained run (d-times)

        absolute_delta_bounds_summed_max = np.zeros((d, d, 2))

        # Set weight for each dimension to minimum and maximum possible value and run optimization of all others

        # We retrieve the relevance bounds and calculate the absolute difference between them and non-constrained bounds

        for i in range(d):

            # min

            lowb = interval[i, 0]

            ranges = self.compute_single_preset_relevance_bounds(i, [lowb, lowb])

            diff = interval - ranges

            diff[i] = 0

            interval_constrained_to_min[i] = ranges

            absolute_delta_bounds_summed_min[i] = diff

            # max

            highb = interval[i, 1]

            ranges = self.compute_single_preset_relevance_bounds(i, [highb, highb])

            diff = interval - ranges

            diff[i] = 0

            interval_constrained_to_max[i] = ranges

            absolute_delta_bounds_summed_max[i] = diff

        feature_points = np.zeros((d, 2 * d * 2))

        for i in range(d):

            feature_points[i, : (2 * d)] = absolute_delta_bounds_summed_min[i].flatten()

            feature_points[i, (2 * d) :] = absolute_delta_bounds_summed_max[i].flatten()

        self.relevance_variance = feature_points

        # Calculate similarity using custom measure

        dist_mat = scipy.spatial.distance.pdist(feature_points, metric=distance)

        # Create linkage tree

        link = linkage(dist_mat, method=method, optimal_ordering=True)

        # Set cutoff at which threshold the linkage gets flattened (clustering)

        RATIO = cutoff_threshold

        threshold = RATIO * np.max(link[:, 2])  # max of branch lengths (distances)

        feature_clustering = fcluster(link, threshold, criterion="distance")

        self.feature_clusters_, self.linkage_ = feature_clustering, link

        return self.feature_clusters_, self.linkage_

Methods

compute_multi_preset_relevance_bounds
def compute_multi_preset_relevance_bounds(
    self,
    preset,
    lupi_features=0
)

Method to run method with preset values

Parameters

lupi_features

View Source
    def compute_multi_preset_relevance_bounds(self, preset, lupi_features=0):

        """

        Method to run method with preset values

        Parameters

        ----------

        lupi_features

        """

        # The user is working with normalized values while we compute them unscaled

        if self.normalize:

            normalized = {}

            for k, v in preset.items():

                normalized[k] = np.asarray(v) * self.best_init_model.L1_factor

            preset = normalized

        # Add sign to presets

        preset = self._add_sign_to_preset(preset)

        # Calculate all bounds with feature i set to min_i

        if lupi_features > 0:

            rangevector, _ = self.get_normalized_lupi_intervals(

                lupi_features, presetModel=preset

            )

        else:

            rangevector, _ = self.get_normalized_intervals(presetModel=preset)

        return rangevector
compute_probe_values
def compute_probe_values(
    self,
    dims,
    isUpper=True,
    parallel=None,
    presetModel=None
)
View Source
    def compute_probe_values(self, dims, isUpper=True, parallel=None, presetModel=None):

        # Get model parameters

        init_model_state = self.best_init_model.model_state

        # Prepare parallel framework

        if parallel is None:

            parallel = joblib.Parallel(n_jobs=self.n_jobs, verbose=self.verbose)

        # Generate

        probe_queue = self._generate_probe_value_tasks(

            self.data,

            dims,

            isUpper,

            self.n_resampling,

            self.random_state,

            presetModel,

            init_model_state,

        )

        # Compute solution

        probe_results = parallel(map(joblib.delayed(_start_solver_worker), probe_queue))

        # probe_values.extend([probe.objective.value for probe in probe_results if probe.is_solved])

        candidates = defaultdict(list)

        for candidate in probe_results:

            # Only add bounds with feasible solutions

            if candidate.is_solved:

                candidates[candidate.probeID].append(candidate)

        probe_values = []

        for probes_for_ID in candidates.values():

            if isUpper:

                probe_values.append(

                    self.problem_type.get_cvxproblem_template.aggregate_max_candidates(

                        probes_for_ID

                    )

                )

            else:

                probe_values.append(

                    self.problem_type.get_cvxproblem_template.aggregate_min_candidates(

                        probes_for_ID

                    )

                )

        return np.array(probe_values)
compute_relevance_bounds
def compute_relevance_bounds(
    self,
    dims,
    parallel=None,
    presetModel=None,
    solverargs=None
)
View Source
    def compute_relevance_bounds(

        self, dims, parallel=None, presetModel=None, solverargs=None

    ):

        init_model_state = self.best_init_model.model_state

        work_queue = self._generate_relevance_bounds_tasks(

            dims, self.data, presetModel, init_model_state

        )

        # Solve relevance bounds in parallel (when available)

        if parallel is None:

            parallel = joblib.Parallel(n_jobs=self.n_jobs, verbose=self.verbose)

        bound_results = parallel(map(joblib.delayed(_start_solver_worker), work_queue))

        # Retrieve results and aggregate values in dict

        solved_bounds = defaultdict(list)

        for finished_bound in bound_results:

            # Only add bounds with feasible solutions

            if finished_bound.is_solved:

                solved_bounds[finished_bound.current_feature].append(finished_bound)

        # Initalize array for pair of bounds(= intervals)

        length = len(dims)

        intervals = np.zeros((length, 2))

        for abs_index, rel_index in zip(dims, range(length)):

            # Return interval for feature i (can be a fixed value when set beforehand)

            interval_i = self._create_interval(abs_index, solved_bounds, presetModel)

            intervals[rel_index] = interval_i

        return intervals
compute_single_preset_relevance_bounds
def compute_single_preset_relevance_bounds(
    self,
    i: int,
    signed_preset_i: [<class 'float'>, <class 'float'>]
)

Method to run method once for one restricted feature Parameters


i: restricted feature signed_preset_i: restricted range of feature i (set before optimization = preset)

View Source
    def compute_single_preset_relevance_bounds(

        self, i: int, signed_preset_i: [float, float]

    ):

        """

        Method to run method once for one restricted feature

        Parameters

        ----------

        i:

            restricted feature

        signed_preset_i:

            restricted range of feature i (set before optimization = preset)

        """

        preset = {i: signed_preset_i}

        rangevector = self.compute_multi_preset_relevance_bounds(preset)

        return rangevector
get_normalized_intervals
def get_normalized_intervals(
    self,
    presetModel=None
)
View Source
    def get_normalized_intervals(self, presetModel=None):

        # We define a list of all the features we want to compute relevance bounds for

        X, _ = self.data

        d = X.shape[1]

        # Depending on the preset model, we dont need to compute all bounds

        # e.g. in the case of fixed features we skip those

        dims = _get_necessary_dimensions(d, presetModel)

        with joblib.Parallel(n_jobs=self.n_jobs, verbose=self.verbose) as parallel:

            relevance_bounds = self.compute_relevance_bounds(

                dims, parallel=parallel, presetModel=presetModel

            )

            probe_values_upper = self.compute_probe_values(

                dims, isUpper=True, parallel=parallel, presetModel=presetModel

            )

            probe_values_lower = self.compute_probe_values(

                dims, isUpper=False, parallel=parallel, presetModel=presetModel

            )

        # Postprocess bounds

        norm_bounds = self._postprocessing(

            self.best_init_model.L1_factor, relevance_bounds

        )

        norm_probe_values_upper = self._postprocessing(

            self.best_init_model.L1_factor, probe_values_upper

        )

        norm_probe_values_lower = self._postprocessing(

            self.best_init_model.L1_factor, probe_values_lower

        )

        self.f_classifier = FeatureClassifier(

            norm_probe_values_lower, norm_probe_values_upper, verbose=self.verbose

        )

        feature_classes = self.f_classifier.classify(norm_bounds)

        return norm_bounds, feature_classes
get_normalized_lupi_intervals
def get_normalized_lupi_intervals(
    self,
    lupi_features,
    presetModel=None
)
View Source
    def get_normalized_lupi_intervals(self, lupi_features, presetModel=None):

        # We define a list of all the features we want to compute relevance bounds for

        X, _ = self.data  # TODO: handle other data formats

        all_d = X.shape[1]

        normal_d = all_d - lupi_features

        # Compute relevance bounds and probes for normal features and LUPI

        with joblib.Parallel(n_jobs=self.n_jobs, verbose=self.verbose) as parallel:

            d_n = _get_necessary_dimensions(normal_d, presetModel)

            rb = self.compute_relevance_bounds(d_n, parallel=parallel)

            probe_upper = self.compute_probe_values(d_n, True, parallel=parallel)

            probe_lower = self.compute_probe_values(d_n, False, parallel=parallel)

            d_l = _get_necessary_dimensions(all_d, presetModel, start=normal_d)

            rb_l = self.compute_relevance_bounds(d_l, parallel=parallel)

            probe_priv_upper = self.compute_probe_values(d_l, True, parallel=parallel)

            probe_priv_lower = self.compute_probe_values(d_l, False, parallel=parallel)

        #

        # Postprocess

        #

        # Get Scaling Parameters

        l1 = self.init_constraints["w_l1"]

        l1_priv = self.init_constraints["w_priv_l1"]

        l1 = l1 + l1_priv

        # Normalize Normal and Lupi features

        rb_norm = self._postprocessing(l1, rb)

        rb_l_norm = self._postprocessing(l1, rb_l)

        interval_ = np.concatenate([rb_norm, rb_l_norm])

        # Normalize Probes

        probe_lower = self._postprocessing(l1, probe_lower)

        probe_upper = self._postprocessing(l1, probe_upper)

        probe_priv_lower = self._postprocessing(l1, probe_priv_lower)

        probe_priv_upper = self._postprocessing(l1, probe_priv_upper)

        #

        #

        # Classify features

        self.f_classifier = FeatureClassifier(

            probe_lower, probe_upper, verbose=self.verbose

        )

        feature_classes = self.f_classifier.classify(rb_norm)

        self.f_classifier_lupi = FeatureClassifier(

            probe_priv_lower, probe_priv_upper, verbose=self.verbose

        )

        feature_classes_lupi = self.f_classifier_lupi.classify(rb_l_norm)

        fc_both = np.concatenate([feature_classes, feature_classes_lupi])

        return interval_, fc_both
grouping
def grouping(
    self,
    interval,
    cutoff_threshold=0.55,
    method='single'
)

Find feature clusters based on observed variance when changing feature contributions

Parameters

cutoff_threshold : float, optional Cutoff value for the flat clustering step; decides at which height in the dendrogram the cut is made to determine groups. method : str, optional Linkage method used in the hierarchical clustering.

Returns

self

View Source
    def grouping(self, interval, cutoff_threshold=0.55, method="single"):

        """Find feature clusters based on observed variance when changing feature contributions

        Parameters

        ----------

        cutoff_threshold : float, optional

            Cutoff value for the flat clustering step; decides at which height in the dendrogram the cut is made to determine groups.

        method : str, optional

            Linkage method used in the hierarchical clustering.

        Returns

        -------

        self

        """

        # Do we have intervals?

        if self.best_init_model is None:

            raise Exception("Model needs to be fitted already.")

        d = len(interval)

        # Init arrays

        interval_constrained_to_min = np.zeros(

            (d, d, 2)

        )  # Save ranges (d,2-dim) for every contrained run (d-times)

        absolute_delta_bounds_summed_min = np.zeros((d, d, 2))

        interval_constrained_to_max = np.zeros(

            (d, d, 2)

        )  # Save ranges (d,2-dim) for every contrained run (d-times)

        absolute_delta_bounds_summed_max = np.zeros((d, d, 2))

        # Set weight for each dimension to minimum and maximum possible value and run optimization of all others

        # We retrieve the relevance bounds and calculate the absolute difference between them and non-constrained bounds

        for i in range(d):

            # min

            lowb = interval[i, 0]

            ranges = self.compute_single_preset_relevance_bounds(i, [lowb, lowb])

            diff = interval - ranges

            diff[i] = 0

            interval_constrained_to_min[i] = ranges

            absolute_delta_bounds_summed_min[i] = diff

            # max

            highb = interval[i, 1]

            ranges = self.compute_single_preset_relevance_bounds(i, [highb, highb])

            diff = interval - ranges

            diff[i] = 0

            interval_constrained_to_max[i] = ranges

            absolute_delta_bounds_summed_max[i] = diff

        feature_points = np.zeros((d, 2 * d * 2))

        for i in range(d):

            feature_points[i, : (2 * d)] = absolute_delta_bounds_summed_min[i].flatten()

            feature_points[i, (2 * d) :] = absolute_delta_bounds_summed_max[i].flatten()

        self.relevance_variance = feature_points

        # Calculate similarity using custom measure

        dist_mat = scipy.spatial.distance.pdist(feature_points, metric=distance)

        # Create linkage tree

        link = linkage(dist_mat, method=method, optimal_ordering=True)

        # Set cutoff at which threshold the linkage gets flattened (clustering)

        RATIO = cutoff_threshold

        threshold = RATIO * np.max(link[:, 2])  # max of branch lengths (distances)

        feature_clustering = fcluster(link, threshold, criterion="distance")

        self.feature_clusters_, self.linkage_ = feature_clustering, link

        return self.feature_clusters_, self.linkage_