Skip to content

Instantly share code, notes, and snippets.

@liyunrui
Created January 18, 2021 07:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save liyunrui/a6c6a564eb22fa080ffa286f86c22d60 to your computer and use it in GitHub Desktop.
Save liyunrui/a6c6a564eb22fa080ffa286f86c22d60 to your computer and use it in GitHub Desktop.
leetcode-ml
def eucliean_distance(x1, x2):
return np.sqrt(np.sum((x1-x2)**2))
class Kmeans:
def __init__(self, K=3, max_iters=100):
"""It's an iterative unsupervised algorithm.
Basically, We'd like to cluster a dataset into k non-overlapping clusters such that total
within-cluster variance summed over all k cluster is as small as possible.
In the end, each sample in dataset will be assigned to the cluster with the nearest centroid.
"""
self.K = K
self.max_iters = max_iters
# initialized clusters and centroids
self.clusters = [[] for _ in range(self.K)] # list of sample indices for each cluster
self.centroids = [] # list of mean feature vector for each cluster
def fit(self, X):
"""
step1:
There's three steps in interative optimised clusters:
step1: update clusters according to current mean vector of each cluster
step2: update centroids according to current clusters
step3: check whether to converge
"""
self.X = X
self.n_samples, self.n_features = X.shape
# step1: randomly assign k centroids from datasets
random_sample_idxs = np.random.choice(self.n_samples, self.K, replace=False)
self.centroids = [self.X[idx] for idx in random_sample_idxs]
# iterative optimised clusters
for _ in range(self.max_iters):
#step1: update clusters
self.clusters = self._get_clusters(self.centroids)
#step2: update centroids
old_centroides = self.centroids
self.centroids = self._get_centroids(self.clusters)
#step3: check whether to converge
if self._is_converged(old_centroides, self.centroids):
break
def predict(self, X):
return self._get_cluster_lables(self.clusters)
def _get_clusters(self, centroids):
"""given mean vector, to assign each sample to the closest centroids of cluster"""
clusters = [[] for _ in range(self.K)]
for idx, sample in enumerate(self.X):
centroid_idx = self._get_closest_centroid(sample, centroids) # return 0 to K-1
clusters[centroid_idx].append(idx)
return clusters
def _get_centroids(self, clusters):
"""return mean feature vector in each cluster"""
centroids = []
for cluster_idx, ls_sample_idx in enumerate(clusters):
cluster_mean = np.mean(self.X[ls_sample_idx], axis=0)
centroids.append(cluster_mean)
return centroids
def _is_converged(self, old_centroids, cur_centroids):
"""
For each cluster, here we simply checking distance between new centroid and old centroid,
to see whether sum over them is equal to zero.
If zeros, which means no more changes for all centroids.
So, we say it's converged.
This can be optimised a lot such as considering early stopping.
"""
distances = [eucliean_distance(old_centroids[i], cur_centroids[i]) for i in range(self.K)]
return sum(distances) == 0
def _get_closest_centroid(self, sample, centroids):
"""Here, we define within-cluster variance as euclidean distance.
So, return index of closest centroid given an observation.
"""
distances = [eucliean_distance(sample, centroid) for centroid in centroids]
closest_idx = np.argmin(distances)
return closest_idx
def _get_cluster_lables(self, clusters):
"""each sample will get the label of the cluster it was assigned to
For example, k=3, label ranges from 0,1,2
"""
labels = np.empty(self.n_samples)
for cluster_idx, ls_sample_idx in enumerate(clusters):
for sample_index in ls_sample_idx:
labels[sample_index] = cluster_idx
return labels
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment