Compute large, sparse correlation matrices in parallel using dask.
import dask
import dask.array as da
import dask.dataframe as dd
import sparse
def corr_on_chunked(chunk1, chunk2, corr_thresh=0.9):
return sparse.COO.from_numpy((, chunk2.T) > corr_thresh))
def chunked_corr_sparse_dask(data, chunksize=5000, corr_thresh=0.9):
# Gets the correlation of a large DataFrame, chunking the computation
# Returns a sparse directed adjancy matrix (old->young)
# Adapted from
numrows = data.shape[0]
data -= np.mean(data, axis=1)[:,None] # subtract means form the input data
data /= np.sqrt(np.sum(data**2, axis=1))[:,None] # normalize the data
rows = []
for r in range(0, numrows, chunksize):
cols = []
for c in range(0, numrows, chunksize):
r1 = r + chunksize
c1 = c + chunksize
chunk1 = data[r:r1]
chunk2 = data[c:c1]
delayed_array = corr_on_chunked(chunk1, chunk2, corr_thresh=corr_thresh)
shape=(chunksize, chunksize),
res = da.vstack(rows).compute()
res = sparse.triu(res, k=1)
return res.tocsr()
