Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import time
from distributed import Worker, Scheduler, Client, Nanny
import toolz
from tornado import gen
from tornado.ioloop import IOLoop
from dask import delayed
@delayed
def inc(x):
# time.sleep(0.01)
return x + 1
@delayed
def add(x, y):
return x + y
L = [inc(i) for i in range(2048)]
while len(L) > 1:
L = [add(a, b) for a, b in toolz.partition_all(2, L)]
[total] = L
@gen.coroutine
def f():
yield [w._start(0) for w in workers]
c = yield Client(s.address, loop=loop, asynchronous=True)
start = time.time()
yield c.compute(total)
stop = time.time()
print("Duration: %0.3f seconds" % (stop - start))
if __name__ == '__main__':
loop = IOLoop.current()
s = Scheduler(loop=loop)
s.start(0)
workers = [Nanny(s.ip, s.port, loop=loop) for i in range(3)]
loop.run_sync(f)
# pip install snakeviz
# python -m cProfile -o prof.out dask-scheduler-benchmark.py
# snakeviz prof.out
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment