Created
January 18, 2021 07:14
-
-
Save liyunrui/a6c6a564eb22fa080ffa286f86c22d60 to your computer and use it in GitHub Desktop.
leetcode-ml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def eucliean_distance(x1, x2): | |
return np.sqrt(np.sum((x1-x2)**2)) | |
class Kmeans: | |
def __init__(self, K=3, max_iters=100): | |
"""It's an iterative unsupervised algorithm. | |
Basically, We'd like to cluster a dataset into k non-overlapping clusters such that total | |
within-cluster variance summed over all k cluster is as small as possible. | |
In the end, each sample in dataset will be assigned to the cluster with the nearest centroid. | |
""" | |
self.K = K | |
self.max_iters = max_iters | |
# initialized clusters and centroids | |
self.clusters = [[] for _ in range(self.K)] # list of sample indices for each cluster | |
self.centroids = [] # list of mean feature vector for each cluster | |
def fit(self, X): | |
""" | |
step1: | |
There's three steps in interative optimised clusters: | |
step1: update clusters according to current mean vector of each cluster | |
step2: update centroids according to current clusters | |
step3: check whether to converge | |
""" | |
self.X = X | |
self.n_samples, self.n_features = X.shape | |
# step1: randomly assign k centroids from datasets | |
random_sample_idxs = np.random.choice(self.n_samples, self.K, replace=False) | |
self.centroids = [self.X[idx] for idx in random_sample_idxs] | |
# iterative optimised clusters | |
for _ in range(self.max_iters): | |
#step1: update clusters | |
self.clusters = self._get_clusters(self.centroids) | |
#step2: update centroids | |
old_centroides = self.centroids | |
self.centroids = self._get_centroids(self.clusters) | |
#step3: check whether to converge | |
if self._is_converged(old_centroides, self.centroids): | |
break | |
def predict(self, X): | |
return self._get_cluster_lables(self.clusters) | |
def _get_clusters(self, centroids): | |
"""given mean vector, to assign each sample to the closest centroids of cluster""" | |
clusters = [[] for _ in range(self.K)] | |
for idx, sample in enumerate(self.X): | |
centroid_idx = self._get_closest_centroid(sample, centroids) # return 0 to K-1 | |
clusters[centroid_idx].append(idx) | |
return clusters | |
def _get_centroids(self, clusters): | |
"""return mean feature vector in each cluster""" | |
centroids = [] | |
for cluster_idx, ls_sample_idx in enumerate(clusters): | |
cluster_mean = np.mean(self.X[ls_sample_idx], axis=0) | |
centroids.append(cluster_mean) | |
return centroids | |
def _is_converged(self, old_centroids, cur_centroids): | |
""" | |
For each cluster, here we simply checking distance between new centroid and old centroid, | |
to see whether sum over them is equal to zero. | |
If zeros, which means no more changes for all centroids. | |
So, we say it's converged. | |
This can be optimised a lot such as considering early stopping. | |
""" | |
distances = [eucliean_distance(old_centroids[i], cur_centroids[i]) for i in range(self.K)] | |
return sum(distances) == 0 | |
def _get_closest_centroid(self, sample, centroids): | |
"""Here, we define within-cluster variance as euclidean distance. | |
So, return index of closest centroid given an observation. | |
""" | |
distances = [eucliean_distance(sample, centroid) for centroid in centroids] | |
closest_idx = np.argmin(distances) | |
return closest_idx | |
def _get_cluster_lables(self, clusters): | |
"""each sample will get the label of the cluster it was assigned to | |
For example, k=3, label ranges from 0,1,2 | |
""" | |
labels = np.empty(self.n_samples) | |
for cluster_idx, ls_sample_idx in enumerate(clusters): | |
for sample_index in ls_sample_idx: | |
labels[sample_index] = cluster_idx | |
return labels |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment