Last active
February 25, 2021 16:13
-
-
Save jponf/ff1836ee91e798b2112e4b8f93550693 to your computer and use it in GitHub Desktop.
A GMeans implementation that I used in some projects back in 2017
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import numpy as np | |
import sklearn.utils | |
from scipy.stats import anderson, kstest, normaltest, shapiro | |
from sklearn.cluster import KMeans | |
# GMeans class | |
############################################################################### | |
class GMeans(object): | |
"""Implementation of the G-Means algorithm for learning k in a k-means | |
clustering. | |
Hamerly, Greg and Elkan, Charles. Learning the K in K-means. NIPS 2003 | |
:param int n_init: Number of time the G-means algorithm will be run with | |
different centroid seeds. The final results will be the best output | |
of `n_init` consecutive runs in terms of inertia. | |
:param int min_samples_to_split: Minimum number of samples in a cluster | |
to be considered for further partitioning. This value cannot be less | |
than 2. | |
:param int min_samples_per_cluster: Once a G-means clustering has been | |
computed, all clusters with less than `min_samples_per_cluster` will | |
be redistributed until all the remaining clusters have at least | |
`min_samples_per_cluster`. | |
:param int strictness: How strict should the Anderson-Darling test for | |
normality be (0 is least strict and 4 is most strict). | |
:param int kmeans_n_init: Number of time the K-means algorithm will be run | |
with different centroid seeds. The final results will be the best | |
output of n_init consecutive runs in terms of inertia. | |
:param int kmeans_max_iter: Maximum number of iterations, for K-means, over | |
the dataset to split before stopping independently of any early | |
stopping criterion heuristics. | |
:param int kmeans_init_n_clusters: The number of clusters to form on the | |
first K-means run. This parameter can make the algorithm converge | |
faster, but should be left unchanged unless you have reasons not to. | |
:param [int, RandomState, None] random_state: If int, random_state is the | |
seed used by the random number generator; If RandomState instance, | |
random_state is the random number generator; If None, the random number | |
generator is the RandomState instance used by np.random. | |
""" | |
def __init__(self, n_init=1, min_samples_to_split=2, | |
min_samples_per_cluster=1, strictness=4, | |
kmeans_n_init=10, kmeans_max_iter=300, | |
kmeans_init_n_clusters=1, random_state=None): | |
if n_init < 1: | |
raise ValueError("n_init must be > 0") | |
if min_samples_to_split < 2: | |
raise ValueError("min_samples_to_split must be >= 2") | |
if min_samples_per_cluster < 1: | |
raise ValueError("min_samples_per_cluster must be >= 1") | |
if not 0 <= strictness <= 4: | |
raise ValueError("strictness must be between [0, 4]") | |
self.n_init = n_init | |
self.min_samples_to_split = min_samples_to_split | |
self.min_samples_per_cluster = min_samples_per_cluster | |
self.strictness = strictness | |
self.kmeans_n_init = kmeans_n_init | |
self.kmeans_max_iter = kmeans_max_iter | |
self.kmeans_init_n_clusters = kmeans_init_n_clusters | |
self.random_state = sklearn.utils.check_random_state(random_state) | |
self._k_means = None | |
@property | |
def cluster_centers_(self): | |
"""Coordinates of cluster centers.""" | |
return self._k_means.cluster_centers_ if self._k_means else [] | |
@property | |
def labels_(self): | |
"""Labels of each point.""" | |
return self._k_means.labels_ if self._k_means else [] | |
@property | |
def inertia_(self): | |
"""Sum of squared distances of samples to their closest cluster | |
center.""" | |
return self._k_means.inertia_ if self._k_means else [] | |
@property | |
def k_means_(self): | |
"""The sklearn.clusters.KMeans object generated at the end of the | |
G-means algorithm. | |
Use this object to access sklearn specific functionality not exposed | |
by GMeans. | |
""" | |
return self._k_means | |
def fit(self, data): | |
"""Compute G-means clustering. | |
:param data: Training instances to cluster. | |
""" | |
data = sklearn.utils.check_array(data, dtype=np.float64) | |
best_inertia = np.inf | |
for _ in range(self.n_init): | |
k_means = _build_gmeans(self, data) | |
if k_means.inertia_ < best_inertia: | |
self._k_means = k_means | |
best_inertia = k_means.inertia_ | |
return self | |
def fit_predict(self, data): | |
"""Compute cluster centers and predict cluster index for each sample. | |
Convenience method; equivalent to calling fit(X) followed by predict(X). | |
:param data: Training instances to cluster. | |
:return: Index of the cluster each sample belongs to. | |
""" | |
return self.fit(data).labels_ | |
def predict(self, data): | |
"""Predict the closest cluster each sample in data belongs to. | |
:param data: New data to predict. | |
:return: Index of the cluster each sample belongs to. | |
""" | |
return self._k_means.predict(data) | |
# GMeans computation | |
############################################################################## | |
# Add support for mini-batch kmeans (better run-time performance) | |
def _build_gmeans(conf, data): | |
change, cluster_centers = True, [] | |
_2means = KMeans(2, max_iter=conf.kmeans_max_iter, | |
n_init=conf.kmeans_n_init, | |
random_state=conf.random_state) | |
kmeans = KMeans(n_clusters=conf.kmeans_init_n_clusters, | |
max_iter=conf.kmeans_max_iter, | |
n_init=conf.kmeans_n_init, | |
random_state=conf.random_state) | |
kmeans.fit(data) | |
while change: | |
change, cluster_centers = False, [] | |
for i, c_center in enumerate(kmeans.cluster_centers_): | |
c_data = data[kmeans.labels_ == i] | |
if len(c_data) < conf.min_samples_to_split or len(c_data) < 2: | |
cluster_centers.append(c_center) | |
else: | |
predictions = _2means.fit_predict(c_data) | |
bins = np.bincount(predictions) | |
# Don't split only one cluster has elements | |
if len(bins) <= 1: | |
cluster_centers.append(c_center) | |
# Test if split is gaussian (discard split) | |
elif _test_2means_is_gaussian(conf, _2means, c_data, | |
test="anderson"): | |
cluster_centers.append(c_center) | |
# Split is not gaussian (keep it) | |
else: | |
cluster_centers.extend(_2means.cluster_centers_) | |
change = True | |
if change: # refinement | |
kmeans = KMeans(n_init=1, n_clusters=len(cluster_centers), | |
max_iter=conf.kmeans_max_iter, | |
init=np.array(cluster_centers)) | |
kmeans.fit(data) | |
# Make sure all clusters meet the user defined number of elements | |
bins = np.bincount(kmeans.labels_) | |
min_size = np.min(bins) | |
while min_size < conf.min_samples_per_cluster: | |
cluster_centers = [c for c, b in zip(cluster_centers, bins) | |
if b > min_size] | |
kmeans = KMeans(n_init=1, n_clusters=len(cluster_centers), | |
max_iter=conf.kmeans_max_iter, | |
init=np.array(cluster_centers)) | |
kmeans.fit(data) | |
bins = np.bincount(kmeans.labels_) | |
min_size = np.min(bins) | |
return kmeans | |
ALPHA_STRICTNESS = {0: 0.15, 1: 0.1, 2: 0.05, 3: 0.025, 4: 0.01} | |
def _test_2means_is_gaussian(conf, split, data, test="anderson"): | |
centroid0, centroid1 = split.cluster_centers_ | |
vec = centroid0 - centroid1 | |
vec_norm = np.linalg.norm(vec, ord=2) | |
scalar_proj = np.inner(vec, data) / vec_norm | |
mean = np.mean(scalar_proj) | |
std = np.std(scalar_proj) | |
scalar_proj = (scalar_proj - mean) / std | |
alpha = ALPHA_STRICTNESS.get(conf.strictness, 0.01) | |
# fix test | |
if test == "agostino" and len(scalar_proj) < 20: | |
test = "anderson" | |
elif test == "shapiro" and len(scalar_proj) < 3: | |
test = "anderson" | |
if test == "anderson": | |
statistic_v, critical_v, _ = anderson(scalar_proj, dist="norm") | |
return statistic_v < critical_v[conf.strictness] | |
elif test == "kstest": | |
_, p_value = kstest(scalar_proj, "norm") | |
return p_value > alpha | |
elif test == "shapiro": | |
_, p_value = shapiro(scalar_proj) | |
return p_value > alpha | |
elif test == "agostino": | |
_, p_value = normaltest(scalar_proj) | |
return p_value > alpha | |
raise RuntimeError("Unknown normality test '{}'".format(test)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment