Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Benchmark the dask distributed scheduler against the multiprocessing scheduler.
# note these depend on global variables in the ipython notebook
# make a top5 committers function
import time
from pprint import pprint
def top5_committers_benchmark(data_pattern):
bag = db.from_s3('githubarchive-data', data_pattern).map(json.loads)
pushes = bag.filter(lambda x: x['type'] == 'PushEvent')
commits = pushes.foldby(get_logins, binop, initial=0, combine=combine)
top5 = commits.topk(5, lambda x: x[1])
# time the default comptue and the distributed compute
default_start = time.time()
default_result = top5.compute()
default_time = time.time() - default_start
dist_start = time.time()
dist_result = top5.compute(get=dc.get)
dist_time = time.time() - dist_start
# assert we have the same result
assert default_result == dist_result
# size of the computed data
size = data_size(data_pattern)
# general details
print("To compute {0:.4f} GB of data the default scheduler took {1:.2f} seconds, the distributed scheduler took {2:.2f} seconds".format(size, default_time, dist_time))
# speedup default_time / dist_time
print("Distributed scheduler is {:.2f} times faster.".format(default_time / dist_time))
# single node bandwidth = size / default_time
print("Default scheduler compute bandwidth: {:.2f} MB/s".format(1e3 * size / default_time))
# dist bandwidth = size / dist_time
print("Distributed scheduler compute bandwidth: {:.2f} MB/s".format(1e3 * size / dist_time))
# dist node bandwidth per node = size / (time * node)
print("Compute bandwidth per node with distributed scheduler: {:.3f} MB/(s node)".format(1e3 * size / (dist_time * 50)))
print("Analysis results:")
pprint(dist_result)
def distributed_benchmark(data_pattern):
bag = db.from_s3('githubarchive-data', data_pattern).map(json.loads)
pushes = bag.filter(lambda x: x['type'] == 'PushEvent')
commits = pushes.foldby(get_logins, binop, initial=0, combine=combine)
top5 = commits.topk(5, lambda x: x[1])
dist_start = time.time()
dist_result = top5.compute(get=dc.get)
dist_time = time.time() - dist_start
# size of the computed data
size = data_size(data_pattern)
# general details
print("To compute {0:.4f} GB of data the distributed scheduler took {1:.2f} seconds".format(size, dist_time))
# dist bandwidth = size / dist_time
print("Distributed scheduler compute bandwidth: {:.2f} MB/s".format(1e3 * size / dist_time))
# dist node bandwidth per node = size / (time * node)
print("Compute bandwidth per node with distributed scheduler: {:.3f} MB/(s node)".format(1e3 * size / (dist_time * 50)))
print("Analysis results:")
pprint(dist_result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment