Skip to content

Instantly share code, notes, and snippets.

@0x0L
Last active November 12, 2021 19:59
Show Gist options
  • Save 0x0L/00f607501246544a5398bfa8120f675b to your computer and use it in GitHub Desktop.
Save 0x0L/00f607501246544a5398bfa8120f675b to your computer and use it in GitHub Desktop.
Clustering algorithms
import heapq
import numpy as np
def generic_linkage(D, method='single'):
N = D.shape[0]
S = set(range(N))
size = {i: 1 for i in range(N)}
d = {}
for i in range(N):
for j in range(i + 1, N):
d[(i, j)] = D[i, j]
Q = []
n_nghbr = {}
mindist = {}
for x in range(N - 1):
nearest = min(range(x + 1, N), key=lambda y: d[(x, y)])
delta = d[(x, nearest)]
n_nghbr[x] = nearest
mindist[x] = delta
heapq.heappush(Q, (delta, x))
L = []
for i in range(1, N):
delta, a = heapq.heappop(Q)
b = n_nghbr[a]
while delta != d[(a, b)]:
b = min({x for x in S if x > a}, key=lambda x: d[(a, x)])
delta = d[(a, b)]
n_nghbr[a] = b
mindist[a] = delta
heapq.heappush(Q, (delta, a))
delta, a = heapq.heappop(Q)
b = n_nghbr[a]
L.append((a, b, delta, size[a] + size[b]))
S -= {a}
for x in S - {b}:
d[_ord(x, b)] = _LINKAGE[method](d[_ord(a, x)], d[_ord(b, x)], d[(a, b)], size[a], size[b], size[x])
size[b] += size[a]
for x in {x for x in S if x < a}:
if n_nghbr[x] == a:
n_nghbr[x] = b
for x in {x for x in S if x < b}:
if d[(x, b)] < mindist[x]:
n_nghbr[x] = b
mindist[x] = d[(x, b)]
Q = [(mindist[x], x) if q[1] == x else q for q in Q]
heapq.heapify(Q)
if b < N - 1:
n_nghbr[b] = min({x for x in S if x > b}, key=lambda x: d[(b, x)])
mindist[b] = d[(b, n_nghbr[b])]
Q = [(mindist[b], b) if q[1] == b else q for q in Q]
heapq.heapify(Q)
L = np.array(L)
for i in range(len(L)):
L[i + 1:, :2][L[i + 1:, :2] == L[i][1]] = N
N += 1
return L
Modern hierarchical, agglomerative clustering algorithms
Daniel Müllner
arXiv:1109.2378v1
def _single(d_ax, d_bx, d_ab, s_a, s_b, s_x):
return min(d_ax, d_bx)
def _complete(d_ax, d_bx, d_ab, s_a, s_b, s_x):
return max(d_ax, d_bx)
def _weighted(d_ax, d_bx, d_ab, s_a, s_b, s_x):
return 0.5 * (d_ax + d_bx)
def _average(d_ax, d_bx, d_ab, s_a, s_b, s_x):
d = s_a * d_ax + s_b * d_bx
d /= s_a + s_b
return d
def _ward(d_ax, d_bx, d_ab, s_a, s_b, s_x):
d = (s_a + s_x) * d_ax**2
d += (s_b + s_x) * d_bx**2
d -= s_x * d_ab**2
d /= s_a + s_b + s_x
return d**0.5
def _centroid(d_ax, d_bx, d_ab, s_a, s_b, s_x):
d = (s_a * d_ax**2 + s_b * d_bx**2) / (s_a + s_b)
d -= s_a * s_b * d_ab**2 / (s_a + s_b)**2
return d**0.5
def _median(d_ax, d_bx, d_ab, s_a, s_b, s_x):
d = d_ax**2 / 2 + d_bx**2 / 2 - d_ab**2 / 4
return d**0.5
_LINKAGE = {
'single': _single,
'complete': _complete,
'average': _average,
'weighted': _weighted,
'ward': _ward,
'centroid': _centroid,
'median': _median,
}
def _ord(a, b):
return min(a, b), max(a, b)
def primitive_clustering(D, method='single'):
N = D.shape[0]
S = set(range(N))
size = {i: 1 for i in range(N)}
d = {}
for i in range(N):
for j in range(i + 1, N):
d[(i, j)] = D[i, j]
L = []
n = N
for ite in range(N - 1):
a, b = min(d, key=lambda k: d[k])
size[n] = size[a] + size[b]
L.append((a, b, d[a, b], size[n]))
update = {
(x, n): _LINKAGE[method](d[_ord(a, x)], d[_ord(b, x)], d[(a, b)], size[a], size[b], size[x])
for x in S if x != a and x != b
}
d = {(i, j): v for (i, j), v in d.items() if i != a and j != a and i != b and j != b}
d |= update
S -= {a, b}
S |= {n}
n += 1
return np.array(L)
from scipy.cluster.hierarchy import linkage, dendrogram, optimal_leaf_ordering
from scipy.spatial.distance import pdist, squareform
from sklearn import datasets
X, y = datasets.make_blobs(centers=3, n_features=1000)
D = squareform(pdist(X))
dendrogram(np.array(primitive_clustering(D, 'median')));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment