Created
March 29, 2024 10:39
-
-
Save kumanna/e4787c68421c11cc14415e4becf6c824 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Linde-Buzo-Gray / Generalized Lloyd algorithm implementation in Python *3*. | |
Heuristic process that can be used to generate cluster points from a big amount of multidimensional vectors. | |
""" | |
import numpy as np | |
import math | |
from functools import reduce | |
from collections import defaultdict | |
_size_data = 0 | |
_dim = 0 | |
def generate_codebook(data, size_codebook, epsilon=0.00001): | |
""" | |
Generate codebook of size <size_codebook> with convergence value <epsilon>. Will return a tuple with the | |
generated codebook, a vector with absolute weights and a vector with relative weights (the weight denotes how many | |
vectors for <data> are in the proximity of the codevector. | |
:param data: input data with N k-dimensional vectors | |
:param size_codebook: codebook size. Because the codevectors are split on each iteration, this should be a | |
power-of-2-value | |
:param epsilon: convergence value | |
:return tuple of: codebook of size <size_codebook>, absolute weights, relative weights | |
""" | |
global _size_data, _dim | |
_size_data = len(data) | |
assert _size_data > 0 | |
_dim = len(data[0]) | |
assert _dim > 0 | |
codebook = [] | |
codebook_abs_weights = [_size_data] | |
codebook_rel_weights = [1.0] | |
# calculate initial codevector: average vector of whole input data | |
c0 = avg_vec_of_vecs(data, _dim, _size_data) | |
codebook.append(c0) | |
# calculate the average distortion | |
avg_dist = avg_distortion_c0(c0, data) | |
# split codevectors until we have have enough | |
while len(codebook) < size_codebook: | |
codebook, codebook_abs_weights, codebook_rel_weights, avg_dist = split_codebook(data, codebook, | |
epsilon, avg_dist) | |
return codebook, codebook_abs_weights, codebook_rel_weights | |
def split_codebook(data, codebook, epsilon, initial_avg_dist): | |
""" | |
Split the codebook so that each codevector in the codebook is split into two. | |
:param data: input data | |
:param codebook: input codebook. its codevectors will be split into two. | |
:param epsilon: convergence value | |
:param initial_avg_dist: initial average distortion | |
:return Tuple with new codebook, codebook absolute weights and codebook relative weights | |
""" | |
# split codevectors | |
new_codevectors = [] | |
for c in codebook: | |
# the new codevectors c1 and c2 will moved by epsilon and -epsilon so to be apart from each other | |
c1 = new_codevector(c, epsilon) | |
c2 = new_codevector(c, -epsilon) | |
new_codevectors.extend((c1, c2)) | |
codebook = new_codevectors | |
len_codebook = len(codebook) | |
abs_weights = [0] * len_codebook | |
rel_weights = [0.0] * len_codebook | |
# print('> splitting to size', len_codebook) | |
# try to reach a convergence by minimizing the average distortion. this is done by moving the codevectors step by | |
# step to the center of the points in their proximity | |
avg_dist = 0 | |
err = epsilon + 1 | |
num_iter = 0 | |
while err > epsilon: | |
# find closest codevectors for each vector in data (find the proximity of each codevector) | |
closest_c_list = [None] * _size_data # list that contains the nearest codevector for each input data vector | |
vecs_near_c = defaultdict(list) # list with codevector index -> input data vector mapping | |
vec_idxs_near_c = defaultdict(list) # list with codevector index -> input data index mapping | |
for i, vec in enumerate(data): # for each input vector | |
min_dist = None | |
closest_c_index = None | |
for i_c, c in enumerate(codebook): # for each codevector | |
d = euclid_squared(vec, c) | |
if min_dist is None or d < min_dist: # found new closest codevector | |
min_dist = d | |
closest_c_list[i] = c | |
closest_c_index = i_c | |
vecs_near_c[closest_c_index].append(vec) | |
vec_idxs_near_c[closest_c_index].append(i) | |
# update codebook: recalculate each codevector so that it sits in the center of the points in their proximity | |
for i_c in range(len_codebook): # for each codevector index | |
vecs = vecs_near_c.get(i_c) or [] # get its proximity input vectors | |
num_vecs_near_c = len(vecs) | |
if num_vecs_near_c > 0: | |
new_c = avg_vec_of_vecs(vecs, _dim) # calculate the new center | |
codebook[i_c] = new_c # update in codebook | |
for i in vec_idxs_near_c[i_c]: # update in input vector index -> codevector mapping list | |
closest_c_list[i] = new_c | |
# update the weights | |
abs_weights[i_c] = num_vecs_near_c | |
rel_weights[i_c] = num_vecs_near_c / _size_data | |
# recalculate average distortion value | |
prev_avg_dist = avg_dist if avg_dist > 0 else initial_avg_dist | |
avg_dist = avg_distortion_c_list(closest_c_list, data) | |
# recalculate the new error value | |
err = (prev_avg_dist - avg_dist) / prev_avg_dist | |
# print(closest_c_list) | |
# print('> iteration', num_iter, 'avg_dist', avg_dist, 'prev_avg_dist', prev_avg_dist, 'err', err) | |
num_iter += 1 | |
return codebook, abs_weights, rel_weights, avg_dist | |
def avg_vec_of_vecs(vecs, dim=None, size=None): | |
""" | |
Calculate average vector (center vector) for input vectors <vecs>. | |
:param vecs: input vectors | |
:param dim: dimension of <vecs> if it was already calculated | |
:param size: size of <vecs> if it was already calculated | |
:return average vector (center vector) for input vectors <vecs> | |
""" | |
size = size or len(vecs) | |
dim = dim or len(vecs[0]) | |
avg_vec = [0.0] * 3 | |
for vec in vecs: | |
p = np.cos(vec[0]), np.sin(vec[0]) * np.cos(vec[1]), np.sin(vec[0]) * np.sin(vec[1]) | |
avg_vec[0] += p[0] / size | |
avg_vec[1] += p[1] / size | |
avg_vec[2] += p[2] / size | |
n = np.linalg.norm(avg_vec) | |
avg_vec[0], avg_vec[1], avg_vec[2] = avg_vec[0] / n, avg_vec[1] / n, avg_vec[2] / n | |
w = np.arccos(avg_vec[0]) | |
p = np.arctan2(avg_vec[2], avg_vec[1]) | |
return [w, p] | |
def new_codevector(c, e): | |
""" | |
Create a new codevector based on <c> but moved by factor <e> | |
:param c: base codevector | |
:param e: move factor | |
:return new codevector | |
""" | |
c = [x * (1.0 + e) for x in c] | |
c[0] = np.abs(c[0]) | |
return c | |
def avg_distortion_c0(c0, data, size=None): | |
""" | |
Average distortion of <c0> in relation to <data> (i.e. how good does <c0> describe <data>?). | |
:param c0: comparison vector | |
:param data: sample data | |
:param size: size of <data> if it was already calculated | |
:return average distortion | |
""" | |
size = size or _size_data | |
return reduce(lambda s, d: s + d / size, | |
(euclid_squared(c0, vec) | |
for vec in data), | |
0.0) | |
def avg_distortion_c_list(c_list, data, size=None): | |
""" | |
Average distortion between input samples <data> and a list <c_list> that contains a codevector for each point in | |
<data>. | |
:param c_list: list that contains a codevector for each point in <data> | |
:param data: input samples | |
:param size: Size of <data> if it was already calculated | |
:return: | |
""" | |
size = size or _size_data | |
return reduce(lambda s, d: s + d / size, | |
(euclid_squared(c_i, data[i]) | |
for i, c_i in enumerate(c_list)), | |
0.0) | |
def euclid_squared(a, b): | |
return np.sin(b[0] - a[0])**2 + np.sin(2 * a[0]) * np.sin(2 * b[0]) * np.sin((b[1] - a[1]) / 2)**2 | |
def code2mat(c): | |
V = np.array([[np.cos(c[0]), np.sin(c[0]) * np.exp(1j * c[1])], | |
[-np.sin(c[0]) * np.exp(-1j * c[1]), np.cos(c[0])]]) | |
return V | |
if __name__ == "__main__": | |
testdata = [] | |
N = 10000 | |
for i in range(N): | |
H = np.random.randn(2,2) + 1j * np.random.randn(2,2) | |
U, S, Vh = np.linalg.svd(H) | |
V = Vh.conj().T | |
D = np.diag(np.exp(-1j * np.angle(np.diag(V)))) | |
V = V @ D | |
U = U @ D | |
U @ np.diag(S) @ V.conj().T - H | |
w = np.arccos(np.real(V[0,0])) | |
phi = np.angle(V[0,1]) | |
testdata.append([w, phi]) | |
cb_size = 4 | |
print('generating codebook for size', cb_size) | |
cb, cb_abs_w, cb_rel_w = generate_codebook(testdata, cb_size, 0.01) | |
print('output:') | |
correction = code2mat(cb[0]).T.conj() | |
for i, c in enumerate(cb): | |
print('> %s, abs_weight=%d, rel_weight=%f' % (c, cb_abs_w[i], cb_rel_w[i])) | |
print(code2mat(c) @ correction) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment