Skip to content

Instantly share code, notes, and snippets.

@jponf
Last active February 25, 2021 16:13
Show Gist options
  • Save jponf/ff1836ee91e798b2112e4b8f93550693 to your computer and use it in GitHub Desktop.
Save jponf/ff1836ee91e798b2112e4b8f93550693 to your computer and use it in GitHub Desktop.
A GMeans implementation that I used in some projects back in 2017
# -*- coding: utf-8 -*-
import numpy as np
import sklearn.utils
from scipy.stats import anderson, kstest, normaltest, shapiro
from sklearn.cluster import KMeans
# GMeans class
###############################################################################
class GMeans(object):
"""Implementation of the G-Means algorithm for learning k in a k-means
clustering.
Hamerly, Greg and Elkan, Charles. Learning the K in K-means. NIPS 2003
:param int n_init: Number of time the G-means algorithm will be run with
different centroid seeds. The final results will be the best output
of `n_init` consecutive runs in terms of inertia.
:param int min_samples_to_split: Minimum number of samples in a cluster
to be considered for further partitioning. This value cannot be less
than 2.
:param int min_samples_per_cluster: Once a G-means clustering has been
computed, all clusters with less than `min_samples_per_cluster` will
be redistributed until all the remaining clusters have at least
`min_samples_per_cluster`.
:param int strictness: How strict should the Anderson-Darling test for
normality be (0 is least strict and 4 is most strict).
:param int kmeans_n_init: Number of time the K-means algorithm will be run
with different centroid seeds. The final results will be the best
output of n_init consecutive runs in terms of inertia.
:param int kmeans_max_iter: Maximum number of iterations, for K-means, over
the dataset to split before stopping independently of any early
stopping criterion heuristics.
:param int kmeans_init_n_clusters: The number of clusters to form on the
first K-means run. This parameter can make the algorithm converge
faster, but should be left unchanged unless you have reasons not to.
:param [int, RandomState, None] random_state: If int, random_state is the
seed used by the random number generator; If RandomState instance,
random_state is the random number generator; If None, the random number
generator is the RandomState instance used by np.random.
"""
def __init__(self, n_init=1, min_samples_to_split=2,
min_samples_per_cluster=1, strictness=4,
kmeans_n_init=10, kmeans_max_iter=300,
kmeans_init_n_clusters=1, random_state=None):
if n_init < 1:
raise ValueError("n_init must be > 0")
if min_samples_to_split < 2:
raise ValueError("min_samples_to_split must be >= 2")
if min_samples_per_cluster < 1:
raise ValueError("min_samples_per_cluster must be >= 1")
if not 0 <= strictness <= 4:
raise ValueError("strictness must be between [0, 4]")
self.n_init = n_init
self.min_samples_to_split = min_samples_to_split
self.min_samples_per_cluster = min_samples_per_cluster
self.strictness = strictness
self.kmeans_n_init = kmeans_n_init
self.kmeans_max_iter = kmeans_max_iter
self.kmeans_init_n_clusters = kmeans_init_n_clusters
self.random_state = sklearn.utils.check_random_state(random_state)
self._k_means = None
@property
def cluster_centers_(self):
"""Coordinates of cluster centers."""
return self._k_means.cluster_centers_ if self._k_means else []
@property
def labels_(self):
"""Labels of each point."""
return self._k_means.labels_ if self._k_means else []
@property
def inertia_(self):
"""Sum of squared distances of samples to their closest cluster
center."""
return self._k_means.inertia_ if self._k_means else []
@property
def k_means_(self):
"""The sklearn.clusters.KMeans object generated at the end of the
G-means algorithm.
Use this object to access sklearn specific functionality not exposed
by GMeans.
"""
return self._k_means
def fit(self, data):
"""Compute G-means clustering.
:param data: Training instances to cluster.
"""
data = sklearn.utils.check_array(data, dtype=np.float64)
best_inertia = np.inf
for _ in range(self.n_init):
k_means = _build_gmeans(self, data)
if k_means.inertia_ < best_inertia:
self._k_means = k_means
best_inertia = k_means.inertia_
return self
def fit_predict(self, data):
"""Compute cluster centers and predict cluster index for each sample.
Convenience method; equivalent to calling fit(X) followed by predict(X).
:param data: Training instances to cluster.
:return: Index of the cluster each sample belongs to.
"""
return self.fit(data).labels_
def predict(self, data):
"""Predict the closest cluster each sample in data belongs to.
:param data: New data to predict.
:return: Index of the cluster each sample belongs to.
"""
return self._k_means.predict(data)
# GMeans computation
##############################################################################
# Add support for mini-batch kmeans (better run-time performance)
def _build_gmeans(conf, data):
change, cluster_centers = True, []
_2means = KMeans(2, max_iter=conf.kmeans_max_iter,
n_init=conf.kmeans_n_init,
random_state=conf.random_state)
kmeans = KMeans(n_clusters=conf.kmeans_init_n_clusters,
max_iter=conf.kmeans_max_iter,
n_init=conf.kmeans_n_init,
random_state=conf.random_state)
kmeans.fit(data)
while change:
change, cluster_centers = False, []
for i, c_center in enumerate(kmeans.cluster_centers_):
c_data = data[kmeans.labels_ == i]
if len(c_data) < conf.min_samples_to_split or len(c_data) < 2:
cluster_centers.append(c_center)
else:
predictions = _2means.fit_predict(c_data)
bins = np.bincount(predictions)
# Don't split only one cluster has elements
if len(bins) <= 1:
cluster_centers.append(c_center)
# Test if split is gaussian (discard split)
elif _test_2means_is_gaussian(conf, _2means, c_data,
test="anderson"):
cluster_centers.append(c_center)
# Split is not gaussian (keep it)
else:
cluster_centers.extend(_2means.cluster_centers_)
change = True
if change: # refinement
kmeans = KMeans(n_init=1, n_clusters=len(cluster_centers),
max_iter=conf.kmeans_max_iter,
init=np.array(cluster_centers))
kmeans.fit(data)
# Make sure all clusters meet the user defined number of elements
bins = np.bincount(kmeans.labels_)
min_size = np.min(bins)
while min_size < conf.min_samples_per_cluster:
cluster_centers = [c for c, b in zip(cluster_centers, bins)
if b > min_size]
kmeans = KMeans(n_init=1, n_clusters=len(cluster_centers),
max_iter=conf.kmeans_max_iter,
init=np.array(cluster_centers))
kmeans.fit(data)
bins = np.bincount(kmeans.labels_)
min_size = np.min(bins)
return kmeans
ALPHA_STRICTNESS = {0: 0.15, 1: 0.1, 2: 0.05, 3: 0.025, 4: 0.01}
def _test_2means_is_gaussian(conf, split, data, test="anderson"):
centroid0, centroid1 = split.cluster_centers_
vec = centroid0 - centroid1
vec_norm = np.linalg.norm(vec, ord=2)
scalar_proj = np.inner(vec, data) / vec_norm
mean = np.mean(scalar_proj)
std = np.std(scalar_proj)
scalar_proj = (scalar_proj - mean) / std
alpha = ALPHA_STRICTNESS.get(conf.strictness, 0.01)
# fix test
if test == "agostino" and len(scalar_proj) < 20:
test = "anderson"
elif test == "shapiro" and len(scalar_proj) < 3:
test = "anderson"
if test == "anderson":
statistic_v, critical_v, _ = anderson(scalar_proj, dist="norm")
return statistic_v < critical_v[conf.strictness]
elif test == "kstest":
_, p_value = kstest(scalar_proj, "norm")
return p_value > alpha
elif test == "shapiro":
_, p_value = shapiro(scalar_proj)
return p_value > alpha
elif test == "agostino":
_, p_value = normaltest(scalar_proj)
return p_value > alpha
raise RuntimeError("Unknown normality test '{}'".format(test))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment