Skip to content

Instantly share code, notes, and snippets.

@tam17aki
Last active August 22, 2023 11:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tam17aki/f5c1d7dbfa396f6192ebdcd25ed50c91 to your computer and use it in GitHub Desktop.
Save tam17aki/f5c1d7dbfa396f6192ebdcd25ed50c91 to your computer and use it in GitHub Desktop.
A demo script of "Differential Entropic Clustering of Multivariate Gaussians" in Python.
"""A demo script of "Differential Entropic Clustering of Multivariate Gaussians".
Copyright (C) 2023 by Akira TAMAMORI
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import argparse
import numpy as np
from numpy import linalg as LA
from progressbar import progressbar as prg
from scipy.spatial import distance
from sklearn.metrics import accuracy_score, homogeneity_score
from sklearn.metrics.cluster import (adjusted_mutual_info_score,
adjusted_rand_score, contingency_matrix,
normalized_mutual_info_score, rand_score)
def check_positive_float(value):
"""Check positivity of float value.
https://stackoverflow.com/questions/14117415
https://stackoverflow.com/questions/64980270
"""
try:
value = float(value) # string to float
if value <= 0.0:
raise argparse.ArgumentTypeError(f"{value} is not a positive float.")
except ValueError as exc:
raise Exception(f"{value} is not a float.") from exc
return value
parser = argparse.ArgumentParser()
parser.add_argument("--data_dim", type=int, help="dimensionality of data.", default=4)
parser.add_argument(
"--n_trial",
type=int,
help="number of traials when changing random seed",
default=50,
)
parser.add_argument("--n_epoch", type=int, default=50)
parser.add_argument("--n_clusters", type=int, help="number of clusters.", default=4)
parser.add_argument(
"--n_train", type=int, help="number of distributions for training.", default=200
)
parser.add_argument(
"--n_test", type=int, help="number of distributions for test.", default=100
)
parser.add_argument(
"--n_points",
type=int,
help="number of points per single empirical distribution.",
default=30,
)
parser.add_argument(
"--simplex_scale", type=float, help="scale of simplex.", default=1.0
)
parser.add_argument(
"--cov_floor",
type=check_positive_float,
help="positive floor of sample covariance matrices.",
default=1.0,
)
def uniform_simplex(n_vertex, seed):
"""Sample uniformly random vector in the unit n-simplex."""
rng = np.random.default_rng(seed)
k = rng.exponential(scale=1.0, size=n_vertex)
return rng.permutation(k / sum(k))
def purity_score(y_true, y_pred):
"""Compute purity score."""
contingency_mat = contingency_matrix(y_true, y_pred)
return np.sum(np.amax(contingency_mat, axis=0)) / np.sum(contingency_mat)
def init_stats():
"""Initialize stats dictionary."""
stats = {
"ri": [], # Rand Index
"ari": [], # Adjusted Rand Index
"ami": [], # Adjusted Mutual Information
"nmi": [], # Normalized Mutual Information
"purity": [], # Purity Score
"homo": [], # Homogeneity Score
"acc": [], # Accuracy (classification)
}
return stats
def append_stats(stats, new_stats):
"""Append new stats to dictionary."""
stats["ri"].append(new_stats["ri"])
stats["ari"].append(new_stats["ari"])
stats["ami"].append(new_stats["ami"])
stats["nmi"].append(new_stats["nmi"])
stats["purity"].append(new_stats["purity"])
stats["homo"].append(new_stats["homo"])
stats["acc"].append(new_stats["acc"])
def print_stats(stats):
"""Print stats."""
means = {
"ri": None, # Rand Index
"ari": None, # Adjusted Rand Index
"ami": None, # Adjusted Mutual Information
"nmi": None, # Normalized Mutual Information
"purity": None, # Purity Score
"homo": None, # Homogeneity Score
"acc": None, # Accuracy (classification)
}
stds = {
"ri": None, # Rand Index
"ari": None, # Adjusted Rand Index
"ami": None, # Adjusted Mutual Information
"nmi": None, # Normalized Mutual Information
"purity": None, # Purity Score
"homo": None, # Homogeneity Score
"acc": None, # Accuracy (classification)
}
means["ri"] = np.mean(np.array(stats["ri"]))
means["ari"] = np.mean(np.array(stats["ari"]))
means["ami"] = np.mean(np.array(stats["ami"]))
means["nmi"] = np.mean(np.array(stats["nmi"]))
means["purity"] = np.mean(np.array(stats["purity"]))
means["homo"] = np.mean(np.array(stats["homo"]))
means["acc"] = np.mean(np.array(stats["acc"]))
stds["ri"] = np.std(np.array(stats["ri"]))
stds["ari"] = np.std(np.array(stats["ari"]))
stds["ami"] = np.std(np.array(stats["ami"]))
stds["nmi"] = np.std(np.array(stats["nmi"]))
stds["purity"] = np.std(np.array(stats["purity"]))
stds["homo"] = np.std(np.array(stats["homo"]))
stds["acc"] = np.std(np.array(stats["acc"]))
print(
"\nClustering:\n"
f"Rand Index = {means['ri']:.06f} ± {stds['ri']:.06f}\n"
f"Adjusted Rand Index = {means['ari']:.06f} ± {stds['ari']:.06f}\n"
f"Adjusted Mutual Information = {means['ami']:.06f} ± {stds['ami']:.06f}\n"
f"Normalized Mutual Information = {means['nmi']:.06f} ± {stds['nmi']:.06f}\n"
f"Purity Score = {means['purity']:.06f} ± {stds['purity']:.06f}\n"
f"Homogenity Score = {means['homo']:.06f} ± {stds['homo']:.06f}\n"
"\nClassification:\n"
f"Accuracy = {means['acc']:.06f} ± {stds['acc']:.06f}\n"
)
def comp_burg_div(mat_x, mat_y):
"""Compute Burg matrix divergence."""
dim = mat_x.shape[0]
mat_y_inv = LA.inv(mat_y)
mat = mat_x @ mat_y_inv
burg_div = np.trace(mat) - np.log(LA.det(mat)) - dim
return burg_div
def comp_maha_dist(vec_x, vec_y, cov):
"""Compute Mahalanobis distance."""
return distance.mahalanobis(vec_x, vec_y, LA.inv(cov))
class EntropicClustering:
"""Differential entropic clustering.
Jason V. Davis and Inderjit Dhillon, "Differential entropic
clustering of multivariate Gaussians," In Advances in Neural
Information Processing Systems (NIPS), 2006.
"""
def __init__(self, cfg, seed):
"""Initialize class."""
self.n_clusters = cfg.n_clusters
self.n_samples = cfg.n_train
self.n_epoch = cfg.n_epoch
rng = np.random.default_rng(seed)
self.assign = rng.choice(self.n_clusters, self.n_samples) # cluster assignments
self.stats = {"mean": None, "cov": None}
self.cluster_means = [None] * self.n_clusters
self.cluster_covs = [None] * self.n_clusters
def _estep(self):
"""Assign each Gaussian to the closest cluster representative Gaussian."""
for i in range(self.n_samples):
assign = []
for j in range(self.n_clusters):
burg_div = comp_burg_div(self.stats["cov"][i], self.cluster_covs[j])
maha_dist = comp_maha_dist(
self.stats["mean"][i], self.cluster_means[j], self.cluster_covs[j]
)
assign.append(burg_div + maha_dist)
self.assign[i] = np.argmin(np.array(assign))
def _mstep(self):
"""Update cluster means and cluster covariances."""
for j in range(self.n_clusters):
idx = np.argwhere(self.assign == j).squeeze()
n_count = len(np.where(self.assign == j)[0])
if n_count == 0:
continue
if n_count == 1:
idx = [idx]
cluster_means = np.zeros_like(self.stats["mean"][0])
for i in idx:
cluster_means += self.stats["mean"][i]
self.cluster_means[j] = cluster_means / n_count
cluster_covs = np.zeros_like(self.stats["cov"][0])
for i in idx:
diff = self.stats["mean"][i] - self.cluster_means[j]
cluster_covs += self.stats["cov"][i] + diff * diff.T
self.cluster_covs[j] = cluster_covs / n_count
def fit(self, means, covs):
"""Fit module.
Args:
means: list of mean vectors.
covs: list of covariance matrices.
"""
self.stats = {"mean": means, "cov": covs}
for _ in range(self.n_epoch):
# perform differential entropic clustering
self._mstep() # update cluster means/covariances
self._estep() # update assignment
def predict(self, means, covs):
"""Predict assignments.
Args:
means: list of mean vectors.
covs: list of covariance matrices.
"""
n_samples = len(means)
assignment = [0] * n_samples
for i in range(n_samples):
assign = []
for j in range(self.n_clusters):
burg_div = comp_burg_div(covs[i], self.cluster_covs[j])
maha_dist = comp_maha_dist(
means[i], self.cluster_means[j], self.cluster_covs[j]
)
assign.append(burg_div + maha_dist)
assignment[i] = np.argmin(np.array(assign))
return np.array(assignment)
def get_dataset(cfg, seed):
"""Instantiate dataset."""
rng = np.random.default_rng(seed)
dim = cfg.data_dim
mean_vec = [
cfg.simplex_scale * uniform_simplex(dim, seed) for _ in range(cfg.n_clusters)
]
cov_scale = cfg.cov_floor + rng.choice(dim, cfg.n_clusters)
train_label = rng.choice(cfg.n_clusters, cfg.n_train)
point_list_train = [
mean_vec[train_label[i]]
+ np.sqrt(cov_scale[train_label[i]])
* rng.standard_normal(size=(cfg.n_points, dim))
for i in range(cfg.n_train)
]
train_stats = {"mean": None, "cov": None}
train_stats["mean"] = [np.mean(point, axis=0) for point in point_list_train]
train_stats["cov"] = [np.cov(point, rowvar=False) for point in point_list_train]
test_label = rng.choice(cfg.n_clusters, cfg.n_test)
point_list_test = [
mean_vec[test_label[i]]
+ np.sqrt(cov_scale[test_label[i]])
* rng.standard_normal(size=(cfg.n_points, dim))
for i in range(cfg.n_test)
]
test_stats = {"mean": None, "cov": None}
test_stats["mean"] = [np.mean(point, axis=0) for point in point_list_test]
test_stats["cov"] = [np.cov(point, rowvar=False) for point in point_list_test]
return train_stats, train_label, test_stats, test_label
def calc_accuracy(labels, stats, module):
"""Compute various accuracy metrics.
Args:
labels: labels of reference.
stats (dict): dictionary of mean vectors and covariance matrices.
module (EntropicClustering): clustering module.
"""
pred = module.predict(stats["mean"], stats["cov"])
return {
"ri": rand_score(labels, pred),
"ari": adjusted_rand_score(labels, pred),
"ami": adjusted_mutual_info_score(labels, pred),
"nmi": normalized_mutual_info_score(labels, pred),
"purity": purity_score(labels, pred),
"homo": homogeneity_score(labels, pred),
"acc": accuracy_score(labels, pred),
}
def main(cfg):
"""Perform training and calculate metric accuracies."""
# perform training while changing random seed for dataset
metrics = {"train": init_stats(), "test": init_stats()}
for seed in prg(range(cfg.n_trial)):
train_stats, train_label, test_stats, test_label = get_dataset(cfg, seed)
module = EntropicClustering(cfg, seed)
module.fit(train_stats["mean"], train_stats["cov"])
append_stats(metrics["train"], calc_accuracy(train_label, train_stats, module))
append_stats(metrics["test"], calc_accuracy(test_label, test_stats, module))
print_stats(metrics["train"])
print_stats(metrics["test"])
if __name__ == "__main__":
config = parser.parse_args()
main(config)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment