import numpy as np | |
from scipy.sparse import csr_matrix | |
from sklearn.decomposition import TruncatedSVD | |
import sklearn.cluster as cl | |
import sys | |
import os | |
import threading | |
from multiprocessing import cpu_count | |
# your input (graph.bin) represents a set of pairwise counts (thing a, thing b, count) | |
# they should be just written out as integers as hash of thing a, hash of thing b, count | |
# the integers should all have the same data type | |
# this is whatever data type you serialized graph.bin as. | |
# eg: if you wrote your values with output_binary_int in ocaml or DataOutputStream.writeInt in java this would be np.dtype('>i4') | |
input_dtype = np.uint32 | |
# load the data file | |
inp = np.fromfile('graph.bin', dtype=input_type).reshape((-1, 3)) | |
# translate your item hashes to sequential ids | |
# this makes the sparse matrix we're going to be generating in a minute more efficient | |
# np.unique returns its values sorted (this is important) | |
keys = np.unique(np.concatenate((np.unique(inp[:, 0]), np.unique(inp[:, 1])))) | |
print(keys.shape, inp.shape) | |
ids_left = np.searchsorted(keys, inp[:, 0]) | |
ids_right = np.searchsorted(keys, inp[:, 1]) | |
vals = inp[:, 2].astype(np.float32) | |
print('sorted') | |
affinity_mat = csr_matrix((vals, (ids_left, ids_right)), shape=(keys.shape[0], keys.shape[0])) | |
print('sparse') | |
# compute the eigenvectors of our affinity matrix | |
eigen_dims = 128 | |
if os.path.exists('eigenvecs.npy'): | |
eigenvecs = np.fromfile('eigenvecs.npy', dtype=np.float32).reshape((-1, eigen_dims)) | |
else: | |
svd = TruncatedSVD(n_components=eigen_dims) | |
svd.fit(affinity_mat) | |
eigenvecs = svd.transform(affinity_mat) | |
eigenvecs.astype(np.float32).tofile('eigenvecs.npy') | |
print('embedded') | |
# find our clusters | |
# the number of clusters you pick is a guess | |
# you should err on the high side because it's much cheaper to merge clusters later than it is to split them | |
n_clusters = 1024 | |
clusterer = cl.MiniBatchKMeans(n_clusters=n_clusters, batch_size=5000) | |
# if you're using openblas make sure to set the environment variable OPENBLAS_NUM_THREADS=1 | |
# otherwise you'll waste half your cpu time on context switching | |
workers = [] | |
n_workers = cpu_count() | |
partition_size = int(eigenvecs.shape[0] / n_workers) | |
partition_size -= partition_size % n_clusters | |
for off in range(0, eigenvecs.shape[0], partition_size): | |
thread = threading.Thread(target=clusterer.fit, args=(eigenvecs[off:(off + partition_size)],)) | |
thread.daemon = True | |
thread.start() | |
workers.append(thread) | |
for worker in workers: | |
worker.join() | |
print('clustered') | |
clusterer.cluster_centers_.tofile('cluster_centers.npy') | |
labels = clusterer.predict(eigenvecs) | |
np.concatenate([keys, labels]).astype(input_type).tofile('cluster_ids.npy') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment