Last active
May 3, 2018 14:01
-
-
Save cowlicks/5e9c7ceceed0e490712d to your computer and use it in GitHub Desktop.
Benchmark the dask distributed scheduler against the multiprocessing scheduler.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# note these depend on global variables in the ipython notebook | |
# make a top5 committers function | |
import time | |
from pprint import pprint | |
def top5_committers_benchmark(data_pattern): | |
bag = db.from_s3('githubarchive-data', data_pattern).map(json.loads) | |
pushes = bag.filter(lambda x: x['type'] == 'PushEvent') | |
commits = pushes.foldby(get_logins, binop, initial=0, combine=combine) | |
top5 = commits.topk(5, lambda x: x[1]) | |
# time the default comptue and the distributed compute | |
default_start = time.time() | |
default_result = top5.compute() | |
default_time = time.time() - default_start | |
dist_start = time.time() | |
dist_result = top5.compute(get=dc.get) | |
dist_time = time.time() - dist_start | |
# assert we have the same result | |
assert default_result == dist_result | |
# size of the computed data | |
size = data_size(data_pattern) | |
# general details | |
print("To compute {0:.4f} GB of data the default scheduler took {1:.2f} seconds, the distributed scheduler took {2:.2f} seconds".format(size, default_time, dist_time)) | |
# speedup default_time / dist_time | |
print("Distributed scheduler is {:.2f} times faster.".format(default_time / dist_time)) | |
# single node bandwidth = size / default_time | |
print("Default scheduler compute bandwidth: {:.2f} MB/s".format(1e3 * size / default_time)) | |
# dist bandwidth = size / dist_time | |
print("Distributed scheduler compute bandwidth: {:.2f} MB/s".format(1e3 * size / dist_time)) | |
# dist node bandwidth per node = size / (time * node) | |
print("Compute bandwidth per node with distributed scheduler: {:.3f} MB/(s node)".format(1e3 * size / (dist_time * 50))) | |
print("Analysis results:") | |
pprint(dist_result) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def distributed_benchmark(data_pattern): | |
bag = db.from_s3('githubarchive-data', data_pattern).map(json.loads) | |
pushes = bag.filter(lambda x: x['type'] == 'PushEvent') | |
commits = pushes.foldby(get_logins, binop, initial=0, combine=combine) | |
top5 = commits.topk(5, lambda x: x[1]) | |
dist_start = time.time() | |
dist_result = top5.compute(get=dc.get) | |
dist_time = time.time() - dist_start | |
# size of the computed data | |
size = data_size(data_pattern) | |
# general details | |
print("To compute {0:.4f} GB of data the distributed scheduler took {1:.2f} seconds".format(size, dist_time)) | |
# dist bandwidth = size / dist_time | |
print("Distributed scheduler compute bandwidth: {:.2f} MB/s".format(1e3 * size / dist_time)) | |
# dist node bandwidth per node = size / (time * node) | |
print("Compute bandwidth per node with distributed scheduler: {:.3f} MB/(s node)".format(1e3 * size / (dist_time * 50))) | |
print("Analysis results:") | |
pprint(dist_result) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment