Skip to content

Instantly share code, notes, and snippets.

@tam17aki
Forked from mblondel/kernel_kmeans.py
Last active April 7, 2023 07:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tam17aki/7877c030e7f23e3bad480a8f226a79dc to your computer and use it in GitHub Desktop.
Save tam17aki/7877c030e7f23e3bad480a8f226a79dc to your computer and use it in GitHub Desktop.
Kernel K-means.
# -*- coding: utf-8 -*-
"""Clustering with Kernel K-means."""
# Author: Mathieu Blondel <mathieu@mblondel.org>
# Author: Akira Tamamori
# License: BSD 3 clause
import matplotlib.pyplot as plt
import numpy as np
from sklearn.base import BaseEstimator, ClusterMixin
from sklearn.datasets import make_circles
from sklearn.metrics.pairwise import pairwise_kernels
from sklearn.utils import check_random_state
class KernelKMeans(BaseEstimator, ClusterMixin):
"""
(Weighted) Kernel K-means.
Reference
---------
Kernel k-means, Spectral Clustering and Normalized Cuts.
Inderjit S. Dhillon, Yuqiang Guan, Brian Kulis.
KDD 2004.
A Unified View of Kernel K-means, Spectral Clustering and Graph Cuts.
Inderjit S. Dhillon.
Computer Science Department, University of Texas at Austin, 2004.
Parameters
----------
n_clusters : int, optional (default=3)
Number of clusters.
kernel : string {'linear', 'poly', 'rbf', 'sigmoid',
'cosine', 'precomputed'}, optional (default='rbf')
Kernel used for PCA.
gamma : {'scale', 'auto'} or float, default='scale'
Kernel coefficient for 'rbf', 'poly' and 'sigmoid'.
- if ``gamma='scale'`` (default) is passed then it uses
1 / (n_features * X.var()) as value of gamma,
- if 'auto', uses 1 / n_features
- if float, must be non-negative
- if None, set to 1 / n_features. - if None
degree : int, optional (default=3)
Degree for poly kernels. Ignored by other kernels.
coef0 : float, optional (default=1)
Independent term in poly and sigmoid kernels.
Ignored by other kernels.
tol : float, optional (default=0)
Convergence tolerance for arpack.
If 0, optimal value will be chosen by arpack.
max_iter : int, optional (default=None)
Maximum number of iterations for arpack.
If None, optimal value will be chosen by arpack.
kernel_params : dict, optional (default=None)
Parameters (keyword arguments) and
values for kernel passed as callable object.
Ignored by other kernels.
random_state : int, RandomState instance or None, optional (default=None)
If int, random_state is the seed used by the random number generator;
If RandomState instance, random_state is the random number generator;
If None, the random number generator is the RandomState instance
used by np.random.
"""
def __init__(
self,
n_clusters=3,
kernel="rbf",
gamma=None,
degree=3,
coef0=1,
tol=1e-3,
max_iter=50,
kernel_params=None,
random_state=None,
verbose=0,
):
self.n_clusters = n_clusters
self.max_iter = max_iter
self.tol = tol
self.random_state = random_state
self.kernel = kernel
self.gamma = gamma
self.degree = degree
self.coef0 = coef0
self.kernel_params = kernel_params
self.verbose = verbose
if isinstance(self.gamma, str): # "linear", "rbf", "poly"
if self.gamma == "scale":
X_var = X.var()
self.gamma = 1.0 / (X.shape[1] * X_var) if X_var != 0 else 1.0
elif self.gamma == "auto":
self.gamma = 1.0 / X.shape[1]
else:
raise ValueError(
"When 'gamma' is a string, it should be either 'scale' or "
f"'auto'. Got '{self.gamma}' instead."
)
@property
def _pairwise(self):
return self.kernel == "precomputed"
def _get_kernel(self, X, Y=None):
if callable(self.kernel):
params = self.kernel_params or {}
else:
params = {"gamma": self.gamma, "degree": self.degree, "coef0": self.coef0}
return pairwise_kernels(X, Y, metric=self.kernel, filter_params=True, **params)
def fit(self, X, y=None, sample_weight=None):
n_samples = X.shape[0]
K = self._get_kernel(X)
sw = sample_weight if sample_weight else np.ones(n_samples)
self.sample_weight_ = sw
rs = check_random_state(self.random_state)
self.labels_ = rs.randint(self.n_clusters, size=n_samples)
dist = np.zeros((n_samples, self.n_clusters))
self.within_distances_ = np.zeros(self.n_clusters)
for it in range(self.max_iter):
dist.fill(0)
self._compute_dist(K, dist, self.within_distances_, update_within=True)
labels_old = self.labels_
self.labels_ = dist.argmin(axis=1)
# Compute the number of samples whose cluster did not change
# since last iteration.
n_same = np.sum((self.labels_ - labels_old) == 0)
if 1 - float(n_same) / n_samples < self.tol:
if self.verbose:
print("Converged at iteration", it + 1)
break
self.X_fit_ = X
return self
def _compute_dist(self, K, dist, within_distances, update_within):
"""Compute a n_samples x n_clusters distance matrix using the
kernel trick."""
sw = self.sample_weight_
for j in range(self.n_clusters):
mask = self.labels_ == j
if np.sum(mask) == 0:
raise ValueError("Empty cluster found, try smaller n_cluster.")
denom = sw[mask].sum()
denomsq = denom * denom
if update_within:
KK = K[np.ix_(mask, mask)]
dist_j = np.sum(np.outer(sw[mask], sw[mask]) * KK / denomsq)
within_distances[j] = dist_j
dist[:, j] += dist_j
else:
dist[:, j] += within_distances[j]
dist[:, j] -= 2 * np.sum(sw[mask] * K[:, mask], axis=1) / denom
dist[:, :] += np.tile(np.expand_dims(np.diag(K), 1), (1, self.n_clusters))
def predict(self, X):
K = self._get_kernel(X, self.X_fit_)
n_samples = X.shape[0]
dist = np.zeros((n_samples, self.n_clusters))
self._compute_dist(K, dist, self.within_distances_, update_within=False)
return dist.argmin(axis=1)
if __name__ == "__main__":
n_clusters = 2
n_samples = 500
max_iter = 100
random_state = 0
X, y = make_circles(
n_samples=n_samples, factor=0.1, noise=0.05, random_state=random_state
)
for i in range(n_clusters):
ind = y == i
plt.scatter(X[ind, 0], X[ind, 1])
plt.show()
km = KernelKMeans(
n_clusters=n_clusters,
max_iter=max_iter,
gamma="scale",
random_state=random_state,
verbose=1,
)
pred = km.fit_predict(X)
for i in range(n_clusters):
ind = pred == i
plt.scatter(X[ind, 0], X[ind, 1])
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment