Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
import numpy as np
from scipy.sparse import csr_matrix
from sklearn.decomposition import TruncatedSVD
import sklearn.cluster as cl
import sys
import os
import threading
from multiprocessing import cpu_count
# your input (graph.bin) represents a set of pairwise counts (thing a, thing b, count)
# they should be just written out as integers as hash of thing a, hash of thing b, count
# the integers should all have the same data type
# this is whatever data type you serialized graph.bin as.
# eg: if you wrote your values with output_binary_int in ocaml or DataOutputStream.writeInt in java this would be np.dtype('>i4')
input_dtype = np.uint32
# load the data file
inp = np.fromfile('graph.bin', dtype=input_type).reshape((-1, 3))
# translate your item hashes to sequential ids
# this makes the sparse matrix we're going to be generating in a minute more efficient
# np.unique returns its values sorted (this is important)
keys = np.unique(np.concatenate((np.unique(inp[:, 0]), np.unique(inp[:, 1]))))
print(keys.shape, inp.shape)
ids_left = np.searchsorted(keys, inp[:, 0])
ids_right = np.searchsorted(keys, inp[:, 1])
vals = inp[:, 2].astype(np.float32)
affinity_mat = csr_matrix((vals, (ids_left, ids_right)), shape=(keys.shape[0], keys.shape[0]))
# compute the eigenvectors of our affinity matrix
eigen_dims = 128
if os.path.exists('eigenvecs.npy'):
eigenvecs = np.fromfile('eigenvecs.npy', dtype=np.float32).reshape((-1, eigen_dims))
svd = TruncatedSVD(n_components=eigen_dims)
eigenvecs = svd.transform(affinity_mat)
# find our clusters
# the number of clusters you pick is a guess
# you should err on the high side because it's much cheaper to merge clusters later than it is to split them
n_clusters = 1024
clusterer = cl.MiniBatchKMeans(n_clusters=n_clusters, batch_size=5000)
# if you're using openblas make sure to set the environment variable OPENBLAS_NUM_THREADS=1
# otherwise you'll waste half your cpu time on context switching
workers = []
n_workers = cpu_count()
partition_size = int(eigenvecs.shape[0] / n_workers)
partition_size -= partition_size % n_clusters
for off in range(0, eigenvecs.shape[0], partition_size):
thread = threading.Thread(, args=(eigenvecs[off:(off + partition_size)],))
thread.daemon = True
for worker in workers:
labels = clusterer.predict(eigenvecs)
np.concatenate([keys, labels]).astype(input_type).tofile('cluster_ids.npy')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment