Skip to content

Instantly share code, notes, and snippets.

@martinkersner
Created July 23, 2017 12:34
Show Gist options
  • Save martinkersner/6415e7306925bd0918cfba42e9754116 to your computer and use it in GitHub Desktop.
Save martinkersner/6415e7306925bd0918cfba42e9754116 to your computer and use it in GitHub Desktop.
k-Means clustering
#!/usr/bin/env python
# Martin Kersner, m.kersner@gmail.com
# 2017/07/23
from __future__ import division
import sys
import numpy as np
import scipy
# Modified version of https://stackoverflow.com/a/5468119
def kmeanspp(X, K):
C = X[0:1, :] # the first cluster equals to the first data point
for k in range(1, K):
D2 = scipy.array([min([scipy.inner(C[c_idx, :]-x,C[c_idx, :]-x) for c_idx in range(C.shape[0])]) for x in X])
probs = D2/D2.sum()
cumprobs = probs.cumsum()
r = scipy.rand()
for j,p in enumerate(cumprobs):
if r < p:
i = j
break
C = np.concatenate((C, [X[i, :]]), axis=0)
return C
class KMeans:
def __init__(self, n_clusters=8, init="k-means++", n_init=10, max_iter=300, tol=0.0001):
if n_clusters < 2:
raise ValueError("n_clusters < 2")
self.n_clusters = n_clusters
self.init = init
self.n_init = n_init
self.max_iter = max_iter
self.tol = tol
self.cluster_centers_ = np.array([])
self.labels_ = []
# sum of distances of samples to their closest cluster center
self.inertia_ = sys.maxint
def l2norm(self, X, axis):
return np.sqrt(np.sum(np.power(X, 2), axis=axis))
def dist(self, A, B, axis=None, disttype="euclidean"):
if disttype == "euclidean":
return self.l2norm(A-B, axis=axis)
elif disttype == "manhattan":
return np.sum(np.abs(A-B))
elif disttype == "cosine":
return np.sum(A*B)/(self.l2norm(A)*self.l2norm(B))
else:
raise ValueError("Unknown similarity distance type.")
def fit(self, X):
n_data = X.shape[0]
dim_data = X.shape[1]
dist_tmp = np.zeros((n_data, self.n_clusters))
for rit in range(self.n_init): # replication
# initialize first k centroids
cluster_centers_tmp = kmeanspp(X, self.n_clusters)
cluster_centers_prev = np.zeros([self.n_clusters, dim_data])
for nit in range(self.max_iter):
# assignment step
for xi in range(n_data):
for ci in range(self.n_clusters):
dist_tmp[xi, ci] = self.dist(X[xi, :], cluster_centers_tmp[ci, :])
labels_tmp = np.argmin(dist_tmp, axis=1)
# update step
cluster_centers_tmp = np.zeros([self.n_clusters, dim_data])
inertia_tmp = 0
for ci in range(self.n_clusters):
cluster_centers_tmp[ci, :] = np.mean(X[labels_tmp == ci, :], axis=0)
inertia_tmp += np.sum(dist_tmp[labels_tmp == ci, ci])
# termination condition
if self.dist(cluster_centers_prev, cluster_centers_tmp) < self.tol:
break
cluster_centers_prev = cluster_centers_tmp
if inertia_tmp < self.inertia_:
self.cluster_centers_ = cluster_centers_tmp
self.labels_ = labels_tmp
self.inertia_ = inertia_tmp
return self
def predict(self, X):
return [np.argmin(self.dist(self.cluster_centers_, X[xi, :], axis=1)) for xi in range(X.shape[0])]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment