Source code for metric_learn.lmnn

"""
Large Margin Nearest Neighbor Metric learning (LMNN)
"""
import numpy as np
from collections import Counter
from sklearn.metrics import euclidean_distances
from sklearn.base import TransformerMixin
import warnings

from ._util import _initialize_components, _check_n_components
from .base_metric import MahalanobisMixin


[docs] class LMNN(MahalanobisMixin, TransformerMixin): """Large Margin Nearest Neighbor (LMNN) LMNN learns a Mahalanobis distance metric in the kNN classification setting. The learned metric attempts to keep close k-nearest neighbors from the same class, while keeping examples from different classes separated by a large margin. This algorithm makes no assumptions about the distribution of the data. Read more in the :ref:`User Guide <lmnn>`. Parameters ---------- init : string or numpy array, optional (default='auto') Initialization of the linear transformation. Possible options are 'auto', 'pca', 'identity', 'random', and a numpy array of shape (n_features_a, n_features_b). 'auto' Depending on ``n_components``, the most reasonable initialization will be chosen. If ``n_components <= n_classes`` we use 'lda', as it uses labels information. If not, but ``n_components < min(n_features, n_samples)``, we use 'pca', as it projects data in meaningful directions (those of higher variance). Otherwise, we just use 'identity'. 'pca' ``n_components`` principal components of the inputs passed to :meth:`fit` will be used to initialize the transformation. (See `sklearn.decomposition.PCA`) 'lda' ``min(n_components, n_classes)`` most discriminative components of the inputs passed to :meth:`fit` will be used to initialize the transformation. (If ``n_components > n_classes``, the rest of the components will be zero.) (See `sklearn.discriminant_analysis.LinearDiscriminantAnalysis`) 'identity' If ``n_components`` is strictly smaller than the dimensionality of the inputs passed to :meth:`fit`, the identity matrix will be truncated to the first ``n_components`` rows. 'random' The initial transformation will be a random array of shape `(n_components, n_features)`. Each value is sampled from the standard normal distribution. numpy array n_features_b must match the dimensionality of the inputs passed to :meth:`fit` and n_features_a must be less than or equal to that. If ``n_components`` is not None, n_features_a must match it. n_neighbors : int, optional (default=3) Number of neighbors to consider, not including self-edges. min_iter : int, optional (default=50) Minimum number of iterations of the optimization procedure. max_iter : int, optional (default=1000) Maximum number of iterations of the optimization procedure. learn_rate : float, optional (default=1e-7) Learning rate of the optimization procedure tol : float, optional (default=0.001) Tolerance of the optimization procedure. If the objective value varies less than `tol`, we consider the algorithm has converged and stop it. verbose : bool, optional (default=False) Whether to print the progress of the optimization procedure. regularization: float, optional (default=0.5) Relative weight between pull and push terms, with 0.5 meaning equal weight. preprocessor : array-like, shape=(n_samples, n_features) or callable The preprocessor to call to get tuples from indices. If array-like, tuples will be formed like this: X[indices]. n_components : int or None, optional (default=None) Dimensionality of reduced space (if None, defaults to dimension of X). random_state : int or numpy.RandomState or None, optional (default=None) A pseudo random number generator object or a seed for it if int. If ``init='random'``, ``random_state`` is used to initialize the random transformation. If ``init='pca'``, ``random_state`` is passed as an argument to PCA when initializing the transformation. k : Renamed to n_neighbors. Will be deprecated in 0.7.0 Attributes ---------- n_iter_ : `int` The number of iterations the solver has run. components_ : `numpy.ndarray`, shape=(n_components, n_features) The learned linear transformation ``L``. Examples -------- >>> import numpy as np >>> from metric_learn import LMNN >>> from sklearn.datasets import load_iris >>> iris_data = load_iris() >>> X = iris_data['data'] >>> Y = iris_data['target'] >>> lmnn = LMNN(n_neighbors=5, learn_rate=1e-6) >>> lmnn.fit(X, Y, verbose=False) References ---------- .. [1] K. Q. Weinberger, J. Blitzer, L. K. Saul. `Distance Metric Learning for Large Margin Nearest Neighbor Classification <http://papers.nips.cc/paper/2795-distance-metric\ -learning-for-large-margin-nearest-neighbor-classification>`_. NIPS 2005. """
[docs] def __init__(self, init='auto', n_neighbors=3, min_iter=50, max_iter=1000, learn_rate=1e-7, regularization=0.5, convergence_tol=0.001, verbose=False, preprocessor=None, n_components=None, random_state=None, k='deprecated'): self.init = init if k != 'deprecated': warnings.warn('"num_chunks" parameter has been renamed to' ' "n_chunks". It has been deprecated in' ' version 0.6.3 and will be removed in 0.7.0' '', FutureWarning) n_neighbors = k self.k = 'deprecated' # To avoid no_attribute error self.n_neighbors = n_neighbors self.min_iter = min_iter self.max_iter = max_iter self.learn_rate = learn_rate self.regularization = regularization self.convergence_tol = convergence_tol self.verbose = verbose self.n_components = n_components self.random_state = random_state super(LMNN, self).__init__(preprocessor)
[docs] def fit(self, X, y): k = self.n_neighbors reg = self.regularization learn_rate = self.learn_rate X, y = self._prepare_inputs(X, y, dtype=float, ensure_min_samples=2) num_pts, d = X.shape output_dim = _check_n_components(d, self.n_components) unique_labels, label_inds = np.unique(y, return_inverse=True) if len(label_inds) != num_pts: raise ValueError('Must have one label per point.') self.labels_ = np.arange(len(unique_labels)) self.components_ = _initialize_components(output_dim, X, y, self.init, self.verbose, random_state=self.random_state) required_k = np.bincount(label_inds).min() if self.n_neighbors > required_k: raise ValueError('not enough class labels for specified k' ' (smallest class has %d)' % required_k) target_neighbors = self._select_targets(X, label_inds) # sum outer products dfG = _sum_outer_products(X, target_neighbors.flatten(), np.repeat(np.arange(X.shape[0]), k)) # initialize L L = self.components_ # first iteration: we compute variables (including objective and gradient) # at initialization point G, objective, total_active = self._loss_grad(X, L, dfG, k, reg, target_neighbors, label_inds) it = 1 # we already made one iteration if self.verbose: print("iter | objective | objective difference | active constraints", "| learning rate") # main loop for it in range(2, self.max_iter): # then at each iteration, we try to find a value of L that has better # objective than the previous L, following the gradient: while True: # the next point next_L to try out is found by a gradient step L_next = L - learn_rate * G # we compute the objective at next point # we copy variables that can be modified by _loss_grad, because if we # retry we don t want to modify them several times (G_next, objective_next, total_active_next) = ( self._loss_grad(X, L_next, dfG, k, reg, target_neighbors, label_inds)) assert not np.isnan(objective) delta_obj = objective_next - objective if delta_obj > 0: # if we did not find a better objective, we retry with an L closer to # the starting point, by decreasing the learning rate (making the # gradient step smaller) learn_rate /= 2 else: # otherwise, if we indeed found a better obj, we get out of the loop break # when the better L is found (and the related variables), we set the # old variables to these new ones before next iteration and we # slightly increase the learning rate L = L_next G, objective, total_active = G_next, objective_next, total_active_next learn_rate *= 1.01 if self.verbose: print(it, objective, delta_obj, total_active, learn_rate) # check for convergence if it > self.min_iter and abs(delta_obj) < self.convergence_tol: if self.verbose: print("LMNN converged with objective", objective) break else: if self.verbose: print("LMNN didn't converge in %d steps." % self.max_iter) # store the last L self.components_ = L self.n_iter_ = it return self
def _loss_grad(self, X, L, dfG, k, reg, target_neighbors, label_inds): # Compute pairwise distances under current metric Lx = L.dot(X.T).T # we need to find the furthest neighbor: Ni = 1 + _inplace_paired_L2(Lx[target_neighbors], Lx[:, None, :]) furthest_neighbors = np.take_along_axis(target_neighbors, Ni.argmax(axis=1)[:, None], 1) impostors = self._find_impostors(furthest_neighbors.ravel(), X, label_inds, L) g0 = _inplace_paired_L2(*Lx[impostors]) # we reorder the target neighbors g1, g2 = Ni[impostors] # compute the gradient total_active = 0 df = np.zeros((X.shape[1], X.shape[1])) for nn_idx in reversed(range(k)): # note: reverse not useful here act1 = g0 < g1[:, nn_idx] act2 = g0 < g2[:, nn_idx] total_active += act1.sum() + act2.sum() targets = target_neighbors[:, nn_idx] PLUS, pweight = _count_edges(act1, act2, impostors, targets) df += _sum_outer_products(X, PLUS[:, 0], PLUS[:, 1], pweight) in_imp, out_imp = impostors df -= _sum_outer_products(X, in_imp[act1], out_imp[act1]) df -= _sum_outer_products(X, in_imp[act2], out_imp[act2]) # do the gradient update assert not np.isnan(df).any() G = dfG * reg + df * (1 - reg) G = L.dot(G) # compute the objective function objective = total_active * (1 - reg) objective += G.flatten().dot(L.flatten()) return 2 * G, objective, total_active def _select_targets(self, X, label_inds): target_neighbors = np.empty((X.shape[0], self.n_neighbors), dtype=int) for label in self.labels_: inds, = np.nonzero(label_inds == label) dd = euclidean_distances(X[inds], squared=True) np.fill_diagonal(dd, np.inf) nn = np.argsort(dd)[..., :self.n_neighbors] target_neighbors[inds] = inds[nn] return target_neighbors def _find_impostors(self, furthest_neighbors, X, label_inds, L): Lx = X.dot(L.T) margin_radii = 1 + _inplace_paired_L2(Lx[furthest_neighbors], Lx) impostors = [] for label in self.labels_[:-1]: in_inds, = np.nonzero(label_inds == label) out_inds, = np.nonzero(label_inds > label) dist = euclidean_distances(Lx[out_inds], Lx[in_inds], squared=True) i1, j1 = np.nonzero(dist < margin_radii[out_inds][:, None]) i2, j2 = np.nonzero(dist < margin_radii[in_inds]) i = np.hstack((i1, i2)) j = np.hstack((j1, j2)) if i.size > 0: # get unique (i,j) pairs using index trickery shape = (i.max() + 1, j.max() + 1) tmp = np.ravel_multi_index((i, j), shape) i, j = np.unravel_index(np.unique(tmp), shape) impostors.append(np.vstack((in_inds[j], out_inds[i]))) if len(impostors) == 0: # No impostors detected return impostors return np.hstack(impostors)
def _inplace_paired_L2(A, B): '''Equivalent to ((A-B)**2).sum(axis=-1), but modifies A in place.''' A -= B return np.einsum('...ij,...ij->...i', A, A) def _count_edges(act1, act2, impostors, targets): imp = impostors[0, act1] c = Counter(zip(imp, targets[imp])) imp = impostors[1, act2] c.update(zip(imp, targets[imp])) if c: active_pairs = np.array(list(c.keys())) else: active_pairs = np.empty((0, 2), dtype=int) return active_pairs, np.array(list(c.values())) def _sum_outer_products(data, a_inds, b_inds, weights=None): Xab = data[a_inds] - data[b_inds] if weights is not None: return np.dot(Xab.T, Xab * weights[:, None]) return np.dot(Xab.T, Xab)