Source code for metric_learn.mmc

"""Mahalanobis Metric for Clustering (MMC)"""
import numpy as np
from sklearn.base import TransformerMixin
from sklearn.utils.validation import assert_all_finite

from .base_metric import _PairsClassifierMixin, MahalanobisMixin
from .constraints import Constraints, wrap_pairs
from ._util import components_from_metric, _initialize_metric_mahalanobis
import warnings


class _BaseMMC(MahalanobisMixin):

  _tuple_size = 2  # constraints are pairs

  def __init__(self, max_iter=100, max_proj=10000, tol=1e-3,
               init='identity', diagonal=False,
               diagonal_c=1.0, verbose=False, preprocessor=None,
               random_state=None,
               convergence_threshold='deprecated'):
    if convergence_threshold != 'deprecated':
      warnings.warn('"convergence_threshold" parameter has been '
                    ' renamed to "tol". It has been deprecated in'
                    ' version 0.6.3 and will be removed in 0.7.0'
                    '', FutureWarning)
      tol = convergence_threshold
    self.convergence_threshold = 'deprecated'  # Avoid errors
    self.max_iter = max_iter
    self.max_proj = max_proj
    self.tol = tol
    self.init = init
    self.diagonal = diagonal
    self.diagonal_c = diagonal_c
    self.verbose = verbose
    self.random_state = random_state
    super(_BaseMMC, self).__init__(preprocessor)

  def _fit(self, pairs, y):
    pairs, y = self._prepare_inputs(pairs, y,
                                    type_of_inputs='tuples')

    self.A_ = _initialize_metric_mahalanobis(pairs, self.init,
                                             random_state=self.random_state,
                                             matrix_name='init')

    if self.diagonal:
      return self._fit_diag(pairs, y)
    else:
      return self._fit_full(pairs, y)

  def _fit_full(self, pairs, y):
    """Learn full metric using MMC.

    Parameters
    ----------
    X : (n x d) data matrix
      Each row corresponds to a single instance.
    constraints : 4-tuple of arrays
      (a,b,c,d) indices into X, with (a,b) specifying similar and (c,d)
      dissimilar pairs.
    """
    num_dim = pairs.shape[2]

    error2 = 1e10
    eps = 0.01        # error-bound of iterative projection on C1 and C2
    A = self.A_

    pos_pairs, neg_pairs = pairs[y == 1], pairs[y == -1]

    # Create weight vector from similar samples
    pos_diff = pos_pairs[:, 0, :] - pos_pairs[:, 1, :]
    w = np.einsum('ij,ik->jk', pos_diff, pos_diff).ravel()
    # `w` is the sum of all outer products of the rows in `pos_diff`.
    # The above `einsum` is equivalent to the much more inefficient:
    # w = np.apply_along_axis(
    #         lambda x: np.outer(x,x).ravel(),
    #         1,
    #         X[a] - X[b]
    #     ).sum(axis = 0)
    t = w.dot(A.ravel()) / 100.0

    w_norm = np.linalg.norm(w)
    w1 = w / w_norm  # make `w` a unit vector
    t1 = t / w_norm  # distance from origin to `w^T*x=t` plane

    cycle = 1
    alpha = 0.1  # initial step size along gradient
    grad1 = self._fS1(pos_pairs, A)            # gradient of similarity
    # constraint function
    grad2 = self._fD1(neg_pairs, A)            # gradient of dissimilarity
    # constraint function
    # gradient of fD1 orthogonal to fS1:
    M = self._grad_projection(grad1, grad2)

    A_old = A.copy()

    for cycle in range(self.max_iter):

      # projection of constraints C1 and C2
      satisfy = False

      for it in range(self.max_proj):

        # First constraint:
        # f(A) = \sum_{i,j \in S} d_ij' A d_ij <= t              (1)
        # (1) can be rewritten as a linear constraint: w^T x = t,
        # where x is the unrolled matrix of A,
        # w is also an unrolled matrix of W where
        # W_{kl}= \sum_{i,j \in S}d_ij^k * d_ij^l
        x0 = A.ravel()
        if w.dot(x0) <= t:
          x = x0
        else:
          x = x0 + (t1 - w1.dot(x0)) * w1
          A[:] = x.reshape(num_dim, num_dim)

        # Second constraint:
        # PSD constraint A >= 0
        # project A onto domain A>0
        l, V = np.linalg.eigh((A + A.T) / 2)
        A[:] = np.dot(V * np.maximum(0, l[None, :]), V.T)

        fDC2 = w.dot(A.ravel())
        error2 = (fDC2 - t) / t
        if error2 < eps:
          satisfy = True
          break

      # third constraint: gradient ascent
      # max: g(A) >= 1
      # here we suppose g(A) = fD(A) = \sum_{I,J \in D} sqrt(d_ij' A d_ij)

      obj_previous = self._fD(neg_pairs, A_old)  # g(A_old)
      obj = self._fD(neg_pairs, A)               # g(A)

      if satisfy and (obj > obj_previous or cycle == 0):

        # If projection of 1 and 2 is successful, and such projection
        # improves objective function, slightly increase learning rate
        # and update from the current A.
        alpha *= 1.05
        A_old[:] = A
        grad2 = self._fS1(pos_pairs, A)
        grad1 = self._fD1(neg_pairs, A)
        M = self._grad_projection(grad1, grad2)
        A += alpha * M

      else:

        # If projection of 1 and 2 failed, or obj <= obj_previous due
        # to projection of 1 and 2, shrink learning rate and re-update
        # from the previous A.
        alpha /= 2
        A[:] = A_old + alpha * M

      delta = np.linalg.norm(alpha * M) / np.linalg.norm(A_old)
      if delta < self.tol:
        break
      if self.verbose:
        print('mmc iter: %d, conv = %f, projections = %d' %
              (cycle, delta, it + 1))

    if delta > self.tol:
      self.converged_ = False
      if self.verbose:
        print('mmc did not converge, conv = %f' % (delta,))
    else:
      self.converged_ = True
      if self.verbose:
        print('mmc converged at iter %d, conv = %f' % (cycle, delta))
    self.A_[:] = A_old
    self.n_iter_ = cycle

    self.components_ = components_from_metric(self.A_)
    return self

  def _fit_diag(self, pairs, y):
    """Learn diagonal metric using MMC.
    Parameters
    ----------
    X : (n x d) data matrix
      Each row corresponds to a single instance.
    constraints : 4-tuple of arrays
      (a,b,c,d) indices into X, with (a,b) specifying similar and (c,d)
      dissimilar pairs.
    """
    num_dim = pairs.shape[2]
    pos_pairs, neg_pairs = pairs[y == 1], pairs[y == -1]
    s_sum = np.sum((pos_pairs[:, 0, :] - pos_pairs[:, 1, :]) ** 2, axis=0)

    it = 0
    error = 1.0
    eps = 1e-6
    reduction = 2.0
    w = np.diag(self.A_).copy()

    while error > self.tol and it < self.max_iter:

      fD0, fD_1st_d, fD_2nd_d = self._D_constraint(neg_pairs, w)
      obj_initial = np.dot(s_sum, w) + self.diagonal_c * fD0
      fS_1st_d = s_sum  # first derivative of the similarity constraints

      # gradient of the objective:
      gradient = fS_1st_d - self.diagonal_c * fD_1st_d
      # Hessian of the objective:
      hessian = -self.diagonal_c * fD_2nd_d + eps * np.eye(num_dim)
      step = np.dot(np.linalg.inv(hessian), gradient)

      # Newton-Rapshon update
      # search over optimal lambda
      lambd = 1  # initial step-size
      w_tmp = np.maximum(0, w - lambd * step)
      obj = (np.dot(s_sum, w_tmp) + self.diagonal_c *
             self._D_objective(neg_pairs, w_tmp))
      assert_all_finite(obj)
      obj_previous = np.inf  # just to get the while-loop started

      inner_it = 0
      while obj < obj_previous:
        obj_previous = obj
        w_previous = w_tmp.copy()
        lambd /= reduction
        w_tmp = np.maximum(0, w - lambd * step)
        obj = (np.dot(s_sum, w_tmp) + self.diagonal_c *
               self._D_objective(neg_pairs, w_tmp))
        inner_it += 1
        assert_all_finite(obj)

      w[:] = w_previous
      error = np.abs((obj_previous - obj_initial) / obj_previous)
      if self.verbose:
        print('mmc iter: %d, conv = %f' % (it, error))
      it += 1

    self.A_ = np.diag(w)

    self.components_ = components_from_metric(self.A_)
    return self

  def _fD(self, neg_pairs, A):
    r"""The value of the dissimilarity constraint function.

    f = f(\sum_{ij \in D} distance(x_i, x_j))
    i.e. distance can be L1:  \sqrt{(x_i-x_j)A(x_i-x_j)'}
    """
    diff = neg_pairs[:, 0, :] - neg_pairs[:, 1, :]
    return np.log(np.sum(np.sqrt(np.sum(np.dot(diff, A) * diff, axis=1))) +
                  1e-6)

  def _fD1(self, neg_pairs, A):
    r"""The gradient of the dissimilarity constraint function w.r.t. A.

    For example, let distance by L1 norm:
    f = f(\sum_{ij \in D} \sqrt{(x_i-x_j)A(x_i-x_j)'})
    df/dA_{kl} = f'* d(\sum_{ij \in D} \sqrt{(x_i-x_j)^k*(x_i-x_j)^l})/dA_{kl}

    Note that d_ij*A*d_ij' = tr(d_ij*A*d_ij') = tr(d_ij'*d_ij*A)
    so, d(d_ij*A*d_ij')/dA = d_ij'*d_ij
        df/dA = f'(\sum_{ij \in D} \sqrt{tr(d_ij'*d_ij*A)})
                * 0.5*(\sum_{ij \in D} (1/sqrt{tr(d_ij'*d_ij*A)})*(d_ij'*d_ij))
    """
    diff = neg_pairs[:, 0, :] - neg_pairs[:, 1, :]
    # outer products of all rows in `diff`
    M = np.einsum('ij,ik->ijk', diff, diff)
    # faster version of: dist = np.sqrt(np.sum(M * A[None,:,:], axis=(1,2)))
    dist = np.sqrt(np.einsum('ijk,jk', M, A))
    # faster version of: sum_deri = np.sum(M /
    # (2 * (dist[:,None,None] + 1e-6)), axis=0)
    sum_deri = np.einsum('ijk,i->jk', M, 0.5 / (dist + 1e-6))
    sum_dist = dist.sum()
    return sum_deri / (sum_dist + 1e-6)

  def _fS1(self, pos_pairs, A):
    r"""The gradient of the similarity constraint function w.r.t. A.

    f = \sum_{ij}(x_i-x_j)A(x_i-x_j)' = \sum_{ij}d_ij*A*d_ij'
    df/dA = d(d_ij*A*d_ij')/dA

    Note that d_ij*A*d_ij' = tr(d_ij*A*d_ij') = tr(d_ij'*d_ij*A)
    so, d(d_ij*A*d_ij')/dA = d_ij'*d_ij
    """
    diff = pos_pairs[:, 0, :] - pos_pairs[:, 1, :]
    # sum of outer products of all rows in `diff`:
    return np.einsum('ij,ik->jk', diff, diff)

  def _grad_projection(self, grad1, grad2):
    grad2 = grad2 / np.linalg.norm(grad2)
    gtemp = grad1 - np.sum(grad1 * grad2) * grad2
    gtemp /= np.linalg.norm(gtemp)
    return gtemp

  def _D_objective(self, neg_pairs, w):
    return np.log(np.sum(np.sqrt(np.sum(((neg_pairs[:, 0, :] -
                                          neg_pairs[:, 1, :]) ** 2) *
                                        w[None, :], axis=1) + 1e-6)))

  def _D_constraint(self, neg_pairs, w):
    """Compute the value, 1st derivative, second derivative (Hessian) of
    a dissimilarity constraint function gF(sum_ij distance(d_ij A d_ij))
    where A is a diagonal matrix (in the form of a column vector 'w').
    """
    diff = neg_pairs[:, 0, :] - neg_pairs[:, 1, :]
    diff_sq = diff * diff
    dist = np.sqrt(diff_sq.dot(w))
    sum_deri1 = np.einsum('ij,i', diff_sq, 0.5 / np.maximum(dist, 1e-6))
    sum_deri2 = np.einsum(
        'ij,ik->jk',
        diff_sq,
        diff_sq / (-4 * np.maximum(1e-6, dist**3))[:, None]
    )
    sum_dist = dist.sum()
    return (
        np.log(sum_dist),
        sum_deri1 / sum_dist,
        sum_deri2 / sum_dist -
        np.outer(sum_deri1, sum_deri1) / (sum_dist * sum_dist)
    )


[docs] class MMC(_BaseMMC, _PairsClassifierMixin): """Mahalanobis Metric for Clustering (MMC) MMC minimizes the sum of squared distances between similar points, while enforcing the sum of distances between dissimilar ones to be greater than one. This leads to a convex and, thus, local-minima-free optimization problem that can be solved efficiently. However, the algorithm involves the computation of eigenvalues, which is the main speed-bottleneck. Since it has initially been designed for clustering applications, one of the implicit assumptions of MMC is that all classes form a compact set, i.e., follow a unimodal distribution, which restricts the possible use-cases of this method. However, it is one of the earliest and a still often cited technique. Read more in the :ref:`User Guide <mmc>`. Parameters ---------- max_iter : int, optional (default=100) Maximum number of iterations of the optimization procedure. max_proj : int, optional (default=10000) Maximum number of projection steps. tol : float, optional (default=1e-3) Convergence threshold for the optimization procedure. init : string or numpy array, optional (default='identity') Initialization of the Mahalanobis matrix. Possible options are 'identity', 'covariance', 'random', and a numpy array of shape (n_features, n_features). 'identity' An identity matrix of shape (n_features, n_features). 'covariance' The (pseudo-)inverse of the covariance matrix. 'random' The initial Mahalanobis matrix will be a random SPD matrix of shape `(n_features, n_features)`, generated using `sklearn.datasets.make_spd_matrix`. numpy array An SPD matrix of shape (n_features, n_features), that will be used as such to initialize the metric. diagonal : bool, optional (default=False) If True, a diagonal metric will be learned, i.e., a simple scaling of dimensions. The initialization will then be the diagonal coefficients of the matrix given as 'init'. diagonal_c : float, optional (default=1.0) Weight of the dissimilarity constraint for diagonal metric learning. Ignored if ``diagonal=False``. verbose : bool, optional (default=False) If True, prints information while learning preprocessor : array-like, shape=(n_samples, n_features) or callable The preprocessor to call to get tuples from indices. If array-like, tuples will be gotten like this: X[indices]. random_state : int or numpy.RandomState or None, optional (default=None) A pseudo random number generator object or a seed for it if int. If ``init='random'``, ``random_state`` is used to initialize the random transformation. convergence_threshold : Renamed to tol. Will be deprecated in 0.7.0 Attributes ---------- n_iter_ : `int` The number of iterations the solver has run. components_ : `numpy.ndarray`, shape=(n_features, n_features) The linear transformation ``L`` deduced from the learned Mahalanobis metric (See function `components_from_metric`.) threshold_ : `float` If the distance metric between two points is lower than this threshold, points will be classified as similar, otherwise they will be classified as dissimilar. Examples -------- >>> from metric_learn import MMC >>> pairs = [[[1.2, 7.5], [1.3, 1.5]], >>> [[6.4, 2.6], [6.2, 9.7]], >>> [[1.3, 4.5], [3.2, 4.6]], >>> [[6.2, 5.5], [5.4, 5.4]]] >>> y = [1, 1, -1, -1] >>> # in this task we want points where the first feature is close to be >>> # closer to each other, no matter how close the second feature is >>> mmc = MMC() >>> mmc.fit(pairs, y) References ---------- .. [1] Xing, Jordan, Russell, Ng. `Distance metric learning with application to clustering with side-information <http://papers.nips.cc/paper/2164-distance-metric-\ learning-with-application-to-clustering-with-side-information.pdf>`_. NIPS 2002. See Also -------- metric_learn.MMC : The original weakly-supervised algorithm :ref:`supervised_version` : The section of the project documentation that describes the supervised version of weakly supervised estimators. """
[docs] def fit(self, pairs, y, calibration_params=None): """Learn the MMC model. The threshold will be calibrated on the trainset using the parameters `calibration_params`. Parameters ---------- pairs : array-like, shape=(n_constraints, 2, n_features) or \ (n_constraints, 2) 3D Array of pairs with each row corresponding to two points, or 2D array of indices of pairs if the metric learner uses a preprocessor. y : array-like, of shape (n_constraints,) Labels of constraints. Should be -1 for dissimilar pair, 1 for similar. calibration_params : `dict` or `None` Dictionary of parameters to give to `calibrate_threshold` for the threshold calibration step done at the end of `fit`. If `None` is given, `calibrate_threshold` will use the default parameters. Returns ------- self : object Returns the instance. """ calibration_params = (calibration_params if calibration_params is not None else dict()) self._validate_calibration_params(**calibration_params) self._fit(pairs, y) self.calibrate_threshold(pairs, y, **calibration_params) return self
[docs] class MMC_Supervised(_BaseMMC, TransformerMixin): """Supervised version of Mahalanobis Metric for Clustering (MMC) `MMC_Supervised` creates pairs of similar sample by taking same class samples, and pairs of dissimilar samples by taking different class samples. It then passes these pairs to `MMC` for training. Parameters ---------- max_iter : int, optional (default=100) Maximum number of iterations of the optimization procedure. max_proj : int, optional (default=10000) Maximum number of projection steps. tol : float, optional (default=1e-3) Convergence threshold for the optimization procedure. n_constraints: int, optional (default=None) Number of constraints to generate. If None, default to `20 * num_classes**2`. init : string or numpy array, optional (default='identity') Initialization of the Mahalanobis matrix. Possible options are 'identity', 'covariance', 'random', and a numpy array of shape (n_features, n_features). 'identity' An identity matrix of shape (n_features, n_features). 'covariance' The (pseudo-)inverse of the covariance matrix. 'random' The initial Mahalanobis matrix will be a random SPD matrix of shape `(n_features, n_features)`, generated using `sklearn.datasets.make_spd_matrix`. numpy array A numpy array of shape (n_features, n_features), that will be used as such to initialize the metric. diagonal : bool, optional (default=False) If True, a diagonal metric will be learned, i.e., a simple scaling of dimensions. The initialization will then be the diagonal coefficients of the matrix given as 'init'. diagonal_c : float, optional (default=1.0) Weight of the dissimilarity constraint for diagonal metric learning. Ignored if ``diagonal=False``. verbose : bool, optional (default=False) If True, prints information while learning preprocessor : array-like, shape=(n_samples, n_features) or callable The preprocessor to call to get tuples from indices. If array-like, tuples will be formed like this: X[indices]. random_state : int or numpy.RandomState or None, optional (default=None) A pseudo random number generator object or a seed for it if int. If ``init='random'``, ``random_state`` is used to initialize the random Mahalanobis matrix. In any case, `random_state` is also used to randomly sample constraints from labels. num_constraints : Renamed to n_constraints. Will be deprecated in 0.7.0 convergence_threshold : Renamed to tol. Will be deprecated in 0.7.0 Examples -------- >>> from metric_learn import MMC_Supervised >>> from sklearn.datasets import load_iris >>> iris_data = load_iris() >>> X = iris_data['data'] >>> Y = iris_data['target'] >>> mmc = MMC_Supervised(n_constraints=200) >>> mmc.fit(X, Y) Attributes ---------- n_iter_ : `int` The number of iterations the solver has run. components_ : `numpy.ndarray`, shape=(n_features, n_features) The linear transformation ``L`` deduced from the learned Mahalanobis metric (See function `components_from_metric`.) """
[docs] def __init__(self, max_iter=100, max_proj=10000, tol=1e-6, n_constraints=None, init='identity', diagonal=False, diagonal_c=1.0, verbose=False, preprocessor=None, random_state=None, num_constraints='deprecated', convergence_threshold='deprecated'): _BaseMMC.__init__(self, max_iter=max_iter, max_proj=max_proj, tol=tol, init=init, diagonal=diagonal, diagonal_c=diagonal_c, verbose=verbose, preprocessor=preprocessor, random_state=random_state, convergence_threshold=convergence_threshold) if num_constraints != 'deprecated': warnings.warn('"num_constraints" parameter has been renamed to' ' "n_constraints". It has been deprecated in' ' version 0.6.3 and will be removed in 0.7.0' '', FutureWarning) self.n_constraints = num_constraints else: self.n_constraints = n_constraints # Avoid test get_params from failing (all params passed sholud be set) self.num_constraints = 'deprecated'
[docs] def fit(self, X, y): """Create constraints from labels and learn the MMC model. Parameters ---------- X : (n x d) matrix Input data, where each row corresponds to a single instance. y : (n) array-like Data labels. """ X, y = self._prepare_inputs(X, y, ensure_min_samples=2) n_constraints = self.n_constraints if n_constraints is None: num_classes = len(np.unique(y)) n_constraints = 20 * num_classes**2 c = Constraints(y) pos_neg = c.positive_negative_pairs(n_constraints, random_state=self.random_state) pairs, y = wrap_pairs(X, pos_neg) return _BaseMMC._fit(self, pairs, y)