Skip to content

Instantly share code, notes, and snippets.

@jbcrail
Last active May 15, 2017 21:15
Show Gist options
  • Save jbcrail/c559c51667b705bdfc1b72e97ace6a98 to your computer and use it in GitHub Desktop.
Save jbcrail/c559c51667b705bdfc1b72e97ace6a98 to your computer and use it in GitHub Desktop.
import dask
import dask.dataframe as dd
import numpy as np
import numba
import time
@numba.njit
def calc_hist(X, Y, xrange, yrange, out, w, h):
x0, x1 = xrange
y0, y1 = yrange
for i in range(len(X)):
indx = int((X[i]-x0)/(x1-x0)*w)
indy = int((Y[i]-y0)/(y1-y0)*h)
out[indy, indx] += 1
def hist(d, xrange, yrange, w, h):
out = np.empty((w, h), dtype='int32')
calc_hist(d.x.values, d.y.values, xrange, yrange,
out, w, h)
return out
if __name__ == '__main__':
from dask import distributed
for cfg in [(8, 1), (4, 2), (2, 4)]:
workers, threads = cfg
print(cfg)
lc = distributed.LocalCluster(n_workers=workers, threads_per_worker=threads)
c = distributed.Client(lc)
t0 = time.time()
df = dd.read_parquet('data/osm.snappy.parq')
#df = df.persist()
#distributed.wait(df)
t1 = time.time()
print('Persisted', t1-t0)
xma, xmi = c.gather(c.compute([df.y.values.max(), df.y.values.min()]))
yma, ymi = c.gather(c.compute([df.x.values.max(), df.x.values.min()]))
width = 600
height = int(900 * 7.0 / 12)
bits = [dask.delayed(hist)(d, (ymi, yma), (xmi, xma), width, height)
for d in df.to_delayed()]
out = c.gather(c.compute(bits))
agg = np.dstack(out).sum(-1)
t2 = time.time()
print('Agg1', t2-t1)
width = 3
height = int(3 * 7.0 / 12)
bits = [dask.delayed(hist)(d, (ymi, yma), (xmi, xma), width, height)
for d in df.to_delayed()]
out = c.gather(c.compute(bits))
agg = np.dstack(out).sum(-1)
t3 = time.time()
print('Agg2', t3-t2)
c.shutdown()
lc.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment