Source code for metric_learn.scml

"""
Sparse Compositional Metric Learning (SCML)
"""

from __future__ import print_function, absolute_import, division
import numpy as np
from .base_metric import _TripletsClassifierMixin, MahalanobisMixin
from ._util import components_from_metric
from sklearn.base import TransformerMixin
from .constraints import Constraints
from sklearn.preprocessing import normalize
from sklearn.neighbors import NearestNeighbors
from sklearn.cluster import KMeans
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.utils import check_array, check_random_state
import warnings


class _BaseSCML(MahalanobisMixin):

  _tuple_size = 3   # constraints are triplets
  _authorized_basis = ['triplet_diffs']

  def __init__(self, beta=1e-5, basis='triplet_diffs', n_basis=None,
               gamma=5e-3, max_iter=10000, output_iter=500, batch_size=10,
               verbose=False, preprocessor=None, random_state=None):
    self.beta = beta
    self.basis = basis
    self.n_basis = n_basis
    self.gamma = gamma
    self.max_iter = max_iter
    self.output_iter = output_iter
    self.batch_size = batch_size
    self.verbose = verbose
    self.preprocessor = preprocessor
    self.random_state = random_state
    super(_BaseSCML, self).__init__(preprocessor)

  def _fit(self, triplets, basis=None, n_basis=None):
    """
    Optimization procedure to find a sparse vector of weights to
    construct the metric from the basis set. This is based on the
    dual averaging method.
    """

    if not isinstance(self.max_iter, int):
      raise ValueError("max_iter should be an integer, instead it is of type"
                       " %s" % type(self.max_iter))
    if not isinstance(self.output_iter, int):
      raise ValueError("output_iter should be an integer, instead it is of "
                       "type %s" % type(self.output_iter))
    if not isinstance(self.batch_size, int):
      raise ValueError("batch_size should be an integer, instead it is of type"
                       " %s" % type(self.batch_size))

    if self.output_iter > self.max_iter:
      raise ValueError("The value of output_iter must be equal or smaller than"
                       " max_iter.")

    # Currently prepare_inputs makes triplets contain points and not indices
    triplets = self._prepare_inputs(triplets, type_of_inputs='tuples')

    # TODO:
    # This algorithm is built to work with indices, but in order to be
    # compliant with the current handling of inputs it is converted
    # back to indices by the following function. This should be improved
    # in the future.
    triplets, X = self._to_index_points(triplets)

    if basis is None:
      basis, n_basis = self._initialize_basis(triplets, X)

    dist_diff = self._compute_dist_diff(triplets, X, basis)

    n_triplets = triplets.shape[0]

    # weight vector
    w = np.zeros((1, n_basis))
    # avarage obj gradient wrt weights
    avg_grad_w = np.zeros((1, n_basis))

    # l2 norm in time of all obj gradients wrt weights
    ada_grad_w = np.zeros((1, n_basis))
    # slack for not dividing by zero
    delta = 0.001

    best_obj = np.inf

    rng = check_random_state(self.random_state)
    rand_int = rng.randint(low=0, high=n_triplets,
                           size=(self.max_iter, self.batch_size))
    for iter in range(self.max_iter):

      idx = rand_int[iter]

      slack_val = 1 + np.matmul(dist_diff[idx, :], w.T)
      slack_mask = np.squeeze(slack_val > 0, axis=1)

      grad_w = np.sum(dist_diff[idx[slack_mask], :],
                      axis=0, keepdims=True)/self.batch_size
      avg_grad_w = (iter * avg_grad_w + grad_w) / (iter+1)

      ada_grad_w = np.sqrt(np.square(ada_grad_w) + np.square(grad_w))

      scale_f = -(iter+1) / (self.gamma * (delta + ada_grad_w))

      # proximal operator with negative trimming equivalent
      w = scale_f * np.minimum(avg_grad_w + self.beta, 0)

      if (iter + 1) % self.output_iter == 0:
        # regularization part of obj function
        obj1 = np.sum(w)*self.beta

        # Every triplet distance difference in the space given by L
        # plus a slack of one
        slack_val = 1 + np.matmul(dist_diff, w.T)
        # Mask of places with positive slack
        slack_mask = slack_val > 0

        # loss function of learning task part of obj function
        obj2 = np.sum(slack_val[slack_mask])/n_triplets

        obj = obj1 + obj2
        if self.verbose:
          count = np.sum(slack_mask)
          print("[%s] iter %d\t obj %.6f\t num_imp %d" %
                (self.__class__.__name__, (iter+1), obj, count))

        # update the best
        if obj < best_obj:
          best_obj = obj
          best_w = w

    if self.verbose:
      print("max iteration reached.")

    # return L matrix yielded from best weights
    self.n_iter_ = iter
    self.components_ = self._components_from_basis_weights(basis, best_w)

    return self

  def _compute_dist_diff(self, triplets, X, basis):
    """
    Helper function to compute the distance difference of every triplet in the
    space yielded by the basis set.
    """
    # Transformation of data by the basis set
    XB = np.matmul(X, basis.T)

    n_triplets = triplets.shape[0]
    # get all positive and negative pairs with lowest index first
    # np.array (2*n_triplets,2)
    triplets_pairs_sorted = np.sort(np.vstack((triplets[:, [0, 1]],
                                               triplets[:, [0, 2]])),
                                    kind='stable')
    # calculate all unique pairs and their indices
    uniqPairs, indices = np.unique(triplets_pairs_sorted, return_inverse=True,
                                   axis=0)
    # calculate L2 distance acording to bases only for unique pairs
    dist = np.square(XB[uniqPairs[:, 0], :] - XB[uniqPairs[:, 1], :])

    # return the diference of distances between all positive and negative
    # pairs
    return dist[indices[:n_triplets]] - dist[indices[n_triplets:]]

  def _components_from_basis_weights(self, basis, w):
    """
    Get components matrix (L) from computed mahalanobis matrix.
    """

    # get rid of inactive bases
    # TODO: Maybe have a tolerance over zero?
    active_idx, = w > 0
    w = w[..., active_idx]
    basis = basis[active_idx, :]

    n_basis, n_features = basis.shape

    if n_basis < n_features:  # if metric is low-rank
      warnings.warn("The number of bases with nonzero weight is less than the "
                    "number of features of the input, in consequence the "
                    "learned transformation reduces the dimension to %d."
                    % n_basis)
      return np.sqrt(w.T)*basis  # equivalent to np.diag(np.sqrt(w)).dot(basis)

    else:   # if metric is full rank
      return components_from_metric(np.matmul(basis.T, w.T*basis))

  def _to_index_points(self, triplets):
    shape = triplets.shape
    X, triplets = np.unique(np.vstack(triplets), return_inverse=True, axis=0)
    triplets = triplets.reshape(shape[:2])
    return triplets, X

  def _initialize_basis(self, triplets, X):
    """ Checks if the basis array is well constructed or constructs it based
    on one of the available options.
    """
    n_features = X.shape[1]

    if isinstance(self.basis, np.ndarray):
      # TODO: should copy?
      basis = check_array(self.basis, copy=True)
      if basis.shape[1] != n_features:
        raise ValueError('The dimensionality ({}) of the provided bases must'
                         ' match the dimensionality of the data '
                         '({}).'.format(basis.shape[1], n_features))
    elif self.basis not in self._authorized_basis:
      raise ValueError(
          "`basis` must be one of the options '{}' "
          "or an array of shape (n_basis, n_features)."
          .format("', '".join(self._authorized_basis)))
    if self.basis == 'triplet_diffs':
      basis, n_basis = self._generate_bases_dist_diff(triplets, X)

    return basis, n_basis

  def _generate_bases_dist_diff(self, triplets, X):
    """ Constructs the basis set from the differences of positive and negative
    pairs from the triplets constraints.

    The basis set is constructed iteratively by taking n_features triplets,
    then adding and substracting respectively all the outerproducts of the
    positive and negative pairs, and finally selecting the eigenvectors
    of this matrix with positive eigenvalue. This is done until n_basis are
    selected.
    """
    n_features = X.shape[1]
    n_triplets = triplets.shape[0]

    if self.n_basis is None:
      # TODO: Get a good default n_basis directive
      n_basis = n_features*80
      warnings.warn('As no value for `n_basis` was selected, the number of '
                    'basis will be set to n_basis= %d' % n_basis)
    elif isinstance(self.n_basis, int):
      n_basis = self.n_basis
    else:
      raise ValueError("n_basis should be an integer, instead it is of type %s"
                       % type(self.n_basis))

    if n_features > n_triplets:
      raise ValueError(
        "Number of features (%s) is greater than the number of triplets(%s).\n"
        "Consider using dimensionality reduction or using another basis "
        "generation scheme." % (n_features, n_triplets))

    basis = np.zeros((n_basis, n_features))

    # get all positive and negative pairs with lowest index first
    # np.array (2*n_triplets,2)
    triplets_pairs_sorted = np.sort(np.vstack((triplets[:, [0, 1]],
                                               triplets[:, [0, 2]])),
                                    kind='stable')
    # calculate all unique pairs and their indices
    uniqPairs, indices = np.unique(triplets_pairs_sorted, return_inverse=True,
                                   axis=0)
    # calculate differences only for unique pairs
    diff = X[uniqPairs[:, 0], :] - X[uniqPairs[:, 1], :]

    diff_pos = diff[indices[:n_triplets], :]
    diff_neg = diff[indices[n_triplets:], :]

    rng = check_random_state(self.random_state)

    start = 0
    finish = 0
    while finish != n_basis:
      # Select triplets to yield diff
      select_triplet = rng.choice(n_triplets, size=n_features, replace=False)

      # select n_features positive differences
      d_pos = diff_pos[select_triplet, :]

      # select n_features negative differences
      d_neg = diff_neg[select_triplet, :]

      # Yield matrix
      diff_sum = d_pos.T.dot(d_pos) - d_neg.T.dot(d_neg)

      # Calculate eigenvalue and eigenvectors
      w, v = np.linalg.eigh(diff_sum.T.dot(diff_sum))

      # Add eigenvectors with positive eigenvalue to basis set
      pos_eig_mask = w > 0
      start = finish
      finish += pos_eig_mask.sum()

      try:
        basis[start:finish, :] = v[pos_eig_mask]
      except ValueError:
        # if finish is greater than n_basis
        basis[start:, :] = v[pos_eig_mask][:n_basis-start]
        break

      # TODO: maybe add a warning in case there are no added bases, this could
      # be caused by a bad triplet set. This would cause an infinite loop

    return basis, n_basis


[docs] class SCML(_BaseSCML, _TripletsClassifierMixin): """Sparse Compositional Metric Learning (SCML) `SCML` learns an squared Mahalanobis distance from triplet constraints by optimizing sparse positive weights assigned to a set of :math:`K` rank-one PSD bases. This can be formulated as an optimization problem with only :math:`K` parameters, that can be solved with an efficient stochastic composite scheme. Read more in the :ref:`User Guide <scml>`. .. warning:: SCML is still a bit experimental, don't hesitate to report if something fails/doesn't work as expected. Parameters ---------- beta: float (default=1e-5) L1 regularization parameter. basis : string or array-like, optional (default='triplet_diffs') Set of bases to construct the metric. Possible options are 'triplet_diffs', and an array-like of shape (n_basis, n_features). 'triplet_diffs' The basis set is constructed iteratively from differences between points of `n_features` positive or negative pairs randomly sampled from the triplets constraints. Requires the number of training triplets to be great or equal to `n_features`. array-like A matrix of shape (n_basis, n_features), that will be used as the basis set for the metric construction. n_basis : int, optional Number of basis to be yielded. In case it is not set it will be set based on `basis`. If no value is selected a default will be computed based on the input. gamma: float (default = 5e-3) Learning rate for the optimization algorithm. max_iter : int (default = 10000) Number of iterations for the algorithm. output_iter : int (default = 5000) Number of iterations to check current weights performance and output this information in case verbose is True. verbose : bool, optional If True, prints information while learning. preprocessor : array-like, shape=(n_samples, n_features) or callable The preprocessor to call to get triplets from indices. If array-like, triplets will be formed like this: X[indices]. random_state : int or numpy.RandomState or None, optional (default=None) A pseudo random number generator object or a seed for it if int. Attributes ---------- components_ : `numpy.ndarray`, shape=(n_features, n_features) The linear transformation ``L`` deduced from the learned Mahalanobis metric (See function `_components_from_basis_weights`.) Examples -------- >>> from metric_learn import SCML >>> triplets = [[[1.2, 7.5], [1.3, 1.5], [6.2, 9.7]], >>> [[1.3, 4.5], [3.2, 4.6], [5.4, 5.4]], >>> [[3.2, 7.5], [3.3, 1.5], [8.2, 9.7]], >>> [[3.3, 4.5], [5.2, 4.6], [7.4, 5.4]]] >>> scml = SCML() >>> scml.fit(triplets) References ---------- .. [1] Y. Shi, A. Bellet and F. Sha. `Sparse Compositional Metric Learning. <http://researchers.lille.inria.fr/abellet/papers/aaai14.pdf>`_. \ (AAAI), 2014. .. [2] Adapted from original `Matlab implementation. \ <https://github.com/bellet/SCML>`_. See Also -------- metric_learn.SCML_Supervised : The supervised version of the algorithm. :ref:`supervised_version` : The section of the project documentation that describes the supervised version of weakly supervised estimators. """
[docs] def fit(self, triplets): """Learn the SCML model. Parameters ---------- triplets : array-like, shape=(n_constraints, 3, n_features) or \ (n_constraints, 3) 3D array-like of triplets of points or 2D array of triplets of indicators. Triplets are assumed to be ordered such that: d(triplets[i, 0],triplets[i, 1]) < d(triplets[i, 0], triplets[i, 2]). Returns ------- self : object Returns the instance. """ return self._fit(triplets)
[docs] class SCML_Supervised(_BaseSCML, TransformerMixin): """Supervised version of Sparse Compositional Metric Learning (SCML) `SCML_Supervised` creates triplets by taking `k_genuine` neighbours of the same class and `k_impostor` neighbours from different classes for each point and then runs the SCML algorithm on these triplets. Read more in the :ref:`User Guide <scml>`. .. warning:: SCML is still a bit experimental, don't hesitate to report if something fails/doesn't work as expected. Parameters ---------- beta: float (default=1e-5) L1 regularization parameter. basis : string or an array-like, optional (default='lda') Set of bases to construct the metric. Possible options are 'lda', and an array-like of shape (n_basis, n_features). 'lda' The `n_basis` basis set is constructed from the LDA of significant local regions in the feature space via clustering, for each region center k-nearest neighbors are used to obtain the LDA scalings, which correspond to the locally discriminative basis. array-like A matrix of shape (n_basis, n_features), that will be used as the basis set for the metric construction. n_basis : int, optional Number of basis to be yielded. In case it is not set it will be set based on `basis`. If no value is selected a default will be computed based on the input. gamma: float (default = 5e-3) Learning rate for the optimization algorithm. max_iter : int (default = 100000) Number of iterations for the algorithm. output_iter : int (default = 5000) Number of iterations to check current weights performance and output this information in case verbose is True. verbose : bool, optional If True, prints information while learning. preprocessor : array-like, shape=(n_samples, n_features) or callable The preprocessor to call to get triplets from indices. If array-like, triplets will be formed like this: X[indices]. random_state : int or numpy.RandomState or None, optional (default=None) A pseudo random number generator object or a seed for it if int. Attributes ---------- components_ : `numpy.ndarray`, shape=(n_features, n_features) The linear transformation ``L`` deduced from the learned Mahalanobis metric (See function `_components_from_basis_weights`.) Examples -------- >>> from metric_learn import SCML_Supervised >>> from sklearn.datasets import load_iris >>> iris_data = load_iris() >>> X = iris_data['data'] >>> Y = iris_data['target'] >>> scml = SCML_Supervised(random_state=33) >>> scml.fit(X, Y) SCML_Supervised(random_state=33) >>> scml.score_pairs([[X[0], X[1]], [X[0], X[2]]]) array([1.84640733, 1.55984363]) >>> scml.get_metric()(X[0], X[1]) 1.8464073327922157 References ---------- .. [1] Y. Shi, A. Bellet and F. Sha. `Sparse Compositional Metric Learning. <http://researchers.lille.inria.fr/abellet/papers/aaai14.pdf>`_. \ (AAAI), 2014. .. [2] Adapted from original `Matlab implementation. \ <https://github.com/bellet/SCML>`_. See Also -------- metric_learn.SCML : The weakly supervised version of this algorithm. """ # Add supervised authorized basis construction options _authorized_basis = _BaseSCML._authorized_basis + ['lda']
[docs] def __init__(self, k_genuine=3, k_impostor=10, beta=1e-5, basis='lda', n_basis=None, gamma=5e-3, max_iter=10000, output_iter=500, batch_size=10, verbose=False, preprocessor=None, random_state=None): self.k_genuine = k_genuine self.k_impostor = k_impostor _BaseSCML.__init__(self, beta=beta, basis=basis, n_basis=n_basis, max_iter=max_iter, output_iter=output_iter, batch_size=batch_size, verbose=verbose, preprocessor=preprocessor, random_state=random_state)
[docs] def fit(self, X, y): """Create constraints from labels and learn the SCML model. Parameters ---------- X : (n x d) matrix Input data, where each row corresponds to a single instance. y : (n) array-like Data labels. Returns ------- self : object Returns the instance. """ X, y = self._prepare_inputs(X, y, ensure_min_samples=2) basis, n_basis = self._initialize_basis_supervised(X, y) if not isinstance(self.k_genuine, int): raise ValueError("k_genuine should be an integer, instead it is of type" " %s" % type(self.k_genuine)) if not isinstance(self.k_impostor, int): raise ValueError("k_impostor should be an integer, instead it is of " "type %s" % type(self.k_impostor)) constraints = Constraints(y) triplets = constraints.generate_knntriplets(X, self.k_genuine, self.k_impostor) triplets = X[triplets] return self._fit(triplets, basis, n_basis)
def _initialize_basis_supervised(self, X, y): """ Constructs the basis set following one of the supervised options in case one is selected. """ if isinstance(self.basis, str) and self.basis == 'lda': basis, n_basis = self._generate_bases_LDA(X, y) else: basis, n_basis = None, None return basis, n_basis def _generate_bases_LDA(self, X, y): """ Generates bases for the 'lda' option. The basis set is constructed using Linear Discriminant Analysis of significant local regions in the feature space via clustering, for each region center k-nearest neighbors are used to obtain the LDA scalings, which correspond to the locally discriminative basis. Currently this is done at two scales `k={10,20}` if `n_feature < 50` or else `k={20,50}`. """ labels, class_count = np.unique(y, return_counts=True) n_class = len(labels) n_features = X.shape[1] # Number of basis yielded from each LDA num_eig = min(n_class-1, n_features) if self.n_basis is None: # TODO: Get a good default n_basis directive n_basis = min(20*n_features, X.shape[0]*2*num_eig - 1) warnings.warn('As no value for `n_basis` was selected, the number of ' 'basis will be set to n_basis= %d' % n_basis) elif isinstance(self.n_basis, int): n_basis = self.n_basis else: raise ValueError("n_basis should be an integer, instead it is of type %s" % type(self.n_basis)) # Number of clusters needed for 2 scales given the number of basis # yielded by every LDA n_clusters = int(np.ceil(n_basis/(2 * num_eig))) if n_basis < n_class: warnings.warn("The number of basis is less than the number of classes, " "which may lead to poor discriminative performance.") elif n_basis >= X.shape[0]*2*num_eig: raise ValueError("Not enough samples to generate %d LDA bases, n_basis" "should be smaller than %d" % (n_basis, X.shape[0]*2*num_eig)) kmeans = KMeans(n_clusters=n_clusters, n_init=10, random_state=self.random_state, algorithm='elkan').fit(X) cX = kmeans.cluster_centers_ n_scales = 2 if n_features > 50: scales = [20, 50] else: scales = [10, 20] k_class = np.vstack((np.minimum(class_count, scales[0]), np.minimum(class_count, scales[1]))) idx_set = [np.zeros((n_clusters, sum(k_class[0, :])), dtype=np.int64), np.zeros((n_clusters, sum(k_class[1, :])), dtype=np.int64)] start_finish_indices = np.hstack((np.zeros((2, 1), np.int64), k_class)).cumsum(axis=1) neigh = NearestNeighbors() for c in range(n_class): sel_c = np.where(y == labels[c]) # get k_class same class neighbors neigh.fit(X=X[sel_c]) # Only take the neighbors once for the biggest scale neighbors = neigh.kneighbors(X=cX, n_neighbors=k_class[-1, c], return_distance=False) # add index set of neighbors for every cluster center for both scales for s, k in enumerate(k_class[:, c]): start, finish = start_finish_indices[s, c:c+2] idx_set[s][:, start:finish] = np.take(sel_c, neighbors[:, :k]) # Compute basis for every cluster in both scales basis = np.zeros((n_basis, n_features)) lda = LinearDiscriminantAnalysis() start_finish_indices = np.hstack((np.vstack((0, n_clusters * num_eig)), np.full((2, n_clusters), num_eig))).cumsum(axis=1) for s in range(n_scales): for c in range(n_clusters): lda.fit(X[idx_set[s][c, :]], y[idx_set[s][c, :]]) start, finish = start_finish_indices[s, c:c+2] normalized_scalings = normalize(lda.scalings_.T) try: basis[start: finish, :] = normalized_scalings except ValueError: # handle tail basis[start:, :] = normalized_scalings[:n_basis-start] break return basis, n_basis