# Copyright Mathieu Blondel December 2011 | |
# License: BSD 3 clause | |
import numpy as np | |
import pylab as pl | |
from sklearn.base import BaseEstimator | |
from sklearn.utils import check_random_state | |
from sklearn.cluster import MiniBatchKMeans | |
from sklearn.cluster import KMeans as KMeansGood | |
from sklearn.metrics.pairwise import euclidean_distances, manhattan_distances | |
from sklearn.datasets.samples_generator import make_blobs | |
############################################################################## | |
# Generate sample data | |
np.random.seed(0) | |
batch_size = 45 | |
centers = [[1, 1], [-1, -1], [1, -1]] | |
n_clusters = len(centers) | |
X, labels_true = make_blobs(n_samples=1200, centers=centers, cluster_std=0.3) | |
class KMeans(BaseEstimator): | |
def __init__(self, k, max_iter=100, random_state=0, tol=1e-4): | |
self.k = k | |
self.max_iter = max_iter | |
self.random_state = random_state | |
self.tol = tol | |
def _e_step(self, X): | |
self.labels_ = euclidean_distances(X, self.cluster_centers_, | |
squared=True).argmin(axis=1) | |
def _average(self, X): | |
return X.mean(axis=0) | |
def _m_step(self, X): | |
X_center = None | |
for center_id in range(self.k): | |
center_mask = self.labels_ == center_id | |
if not np.any(center_mask): | |
# The centroid of empty clusters is set to the center of | |
# everything | |
if X_center is None: | |
X_center = self._average(X) | |
self.cluster_centers_[center_id] = X_center | |
else: | |
self.cluster_centers_[center_id] = \ | |
self._average(X[center_mask]) | |
def fit(self, X, y=None): | |
n_samples = X.shape[0] | |
vdata = np.mean(np.var(X, 0)) | |
random_state = check_random_state(self.random_state) | |
self.labels_ = random_state.permutation(n_samples)[:self.k] | |
self.cluster_centers_ = X[self.labels_] | |
for i in xrange(self.max_iter): | |
centers_old = self.cluster_centers_.copy() | |
self._e_step(X) | |
self._m_step(X) | |
if np.sum((centers_old - self.cluster_centers_) ** 2) < self.tol * vdata: | |
break | |
return self | |
class KMedians(KMeans): | |
def _e_step(self, X): | |
self.labels_ = manhattan_distances(X, self.cluster_centers_).argmin(axis=1) | |
def _average(self, X): | |
return np.median(X, axis=0) | |
class FuzzyKMeans(KMeans): | |
def __init__(self, k, m=2, max_iter=100, random_state=0, tol=1e-4): | |
""" | |
m > 1: fuzzy-ness parameter | |
The closer to m is to 1, the closter to hard kmeans. | |
The bigger m, the fuzzier (converge to the global cluster). | |
""" | |
self.k = k | |
assert m > 1 | |
self.m = m | |
self.max_iter = max_iter | |
self.random_state = random_state | |
self.tol = tol | |
def _e_step(self, X): | |
D = 1.0 / euclidean_distances(X, self.cluster_centers_, squared=True) | |
D **= 1.0 / (self.m - 1) | |
D /= np.sum(D, axis=1)[:, np.newaxis] | |
# shape: n_samples x k | |
self.fuzzy_labels_ = D | |
self.labels_ = self.fuzzy_labels_.argmax(axis=1) | |
def _m_step(self, X): | |
weights = self.fuzzy_labels_ ** self.m | |
# shape: n_clusters x n_features | |
self.cluster_centers_ = np.dot(X.T, weights).T | |
self.cluster_centers_ /= weights.sum(axis=0)[:, np.newaxis] | |
def fit(self, X, y=None): | |
n_samples, n_features = X.shape | |
vdata = np.mean(np.var(X, 0)) | |
random_state = check_random_state(self.random_state) | |
self.fuzzy_labels_ = random_state.rand(n_samples, self.k) | |
self.fuzzy_labels_ /= self.fuzzy_labels_.sum(axis=1)[:, np.newaxis] | |
self._m_step(X) | |
for i in xrange(self.max_iter): | |
centers_old = self.cluster_centers_.copy() | |
self._e_step(X) | |
self._m_step(X) | |
if np.sum((centers_old - self.cluster_centers_) ** 2) < self.tol * vdata: | |
break | |
return self | |
kmeans = KMeans(k=3) | |
kmeans.fit(X) | |
kmedians = KMedians(k=3) | |
kmedians.fit(X) | |
fuzzy_kmeans = FuzzyKMeans(k=3, m=2) | |
fuzzy_kmeans.fit(X) | |
fig = pl.figure() | |
colors = ['#4EACC5', '#FF9C34', '#4E9A06'] | |
objects = (kmeans, kmedians, fuzzy_kmeans) | |
for i, obj in enumerate(objects): | |
ax = fig.add_subplot(1, len(objects), i + 1) | |
for k, col in zip(range(obj.k), colors): | |
my_members = obj.labels_ == k | |
cluster_center = obj.cluster_centers_[k] | |
ax.plot(X[my_members, 0], X[my_members, 1], 'w', | |
markerfacecolor=col, marker='.') | |
ax.plot(cluster_center[0], cluster_center[1], 'o', markerfacecolor=col, | |
markeredgecolor='k', markersize=6) | |
ax.set_title(obj.__class__.__name__) | |
pl.show() |
This comment has been minimized.
This comment has been minimized.
shouldn't it be: |
This comment has been minimized.
This comment has been minimized.
I could not plot the data using your code. Shouldn't it be
instead of
? |
This comment has been minimized.
This comment has been minimized.
How about the same code for images? how do I do it? |
This comment has been minimized.
This comment has been minimized.
So sklearn KMeans (KMeansGood here) has initializes the K centroids n_init times and returns the results from where inertia_ ("sum of squared distances of samples to their closest cluster center" is smallest. It does not seem like KMedians() does this, correct? Would it be possible to implement this functionality? (I realize I could write the wrapper to do it.) |
This comment has been minimized.
This comment has been minimized.
FuzzyKMeans only supports .fit()?, not .fit_predict()? |
This comment has been minimized.
This comment has been minimized.
Try pairwise_distances_argmin from sklearn |
This comment has been minimized.
This comment has been minimized.
It seems like as of scikit-learn v0.24.0, make_blobs() is in datasets module, not datasets.samples_generator(), so line 12 needs to be updated. |
This comment has been minimized.
What is the reference for this specific Fuzzy Kmeans implementation?
Thanks,