Created
February 10, 2018 14:39
-
-
Save masdeseiscaracteres/80d302e7ce44ab06c0d9f4d783fc0ac3 to your computer and use it in GitHub Desktop.
Parallelizing with Dask & Dask Distributed
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from distributed import Client, as_completed | |
from dask import delayed | |
from time import sleep | |
import numpy as np | |
from pprint import pprint | |
# Define a time-consuming task | |
def foo(n): | |
print("Starting the {:d}-second task".format(n)) | |
sleep(n) | |
print("Ending the {:d}-second task".format(n)) | |
return n | |
######################################################################################################################## | |
# Goal: parallel execution of the above task for the following set of params: | |
######################################################################################################################## | |
# Specify task details | |
args_iterable = range(5, 15) # launch task with 10 different sets of arguments | |
is_pure = False # specifiy task purity (https://toolz.readthedocs.io/en/latest/purity.html) | |
# optional: it enables a finer control of caching | |
c = Client(processes=False) # setup cluster: scheduler + workers (choose to deploy them either as processes or threads) | |
# We can also connect to a remotely deployed cluster (if available to us) | |
pprint(c.scheduler_info()) # print information about the cluster we are currently connecting to | |
######################################################################################################################## | |
# Solution 1: using the standard concurrent.futures interface (PEP-3148) | |
######################################################################################################################## | |
# Send tasks | |
send_mode = 'map' | |
if send_mode == 'submit': | |
fut_list = [] | |
for i in args_iterable: | |
f = c.submit(foo, i, pure=is_pure) # start executing foo(10) immediately, pure functions are cached | |
fut_list.append(f) | |
elif send_mode == 'map': | |
fut_list = c.map(foo, args_iterable, pure=is_pure) | |
# Receive task results | |
rec_mode = 'as_completed' | |
if rec_mode == 'gather': | |
# Wait for all tasks to finish and gather the results in a list | |
out = c.gather(fut_list) | |
print('Result:', out) | |
elif rec_mode == 'result': | |
# Wait for the first one of the tasks to finish | |
out = fut_list[0].result() | |
print('Result:', out) | |
elif rec_mode == 'as_completed': | |
# Retrieve each task result as they finish | |
for future, out in as_completed(fut_list, with_results=True): | |
print('Result:', out) | |
######################################################################################################################## | |
# Solution 2: define a computation DAG by chaining Delayed-type objects | |
######################################################################################################################## | |
# Build a list of tasks and aggregat them into a parent task object, but do not execute them yet | |
task_list = [] | |
for i in args_iterable: | |
task_list.append(delayed(foo, pure=is_pure)(i)) | |
parent = delayed(task_list) | |
# Execute tasks synchronously (interpreter blocks until all results are computed) | |
out = parent.compute() | |
print('Result:', out) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment