Skip to content

Instantly share code, notes, and snippets.

@masdeseiscaracteres
Created February 10, 2018 14:39
Show Gist options
  • Save masdeseiscaracteres/80d302e7ce44ab06c0d9f4d783fc0ac3 to your computer and use it in GitHub Desktop.
Save masdeseiscaracteres/80d302e7ce44ab06c0d9f4d783fc0ac3 to your computer and use it in GitHub Desktop.
Parallelizing with Dask & Dask Distributed
from distributed import Client, as_completed
from dask import delayed
from time import sleep
import numpy as np
from pprint import pprint
# Define a time-consuming task
def foo(n):
print("Starting the {:d}-second task".format(n))
sleep(n)
print("Ending the {:d}-second task".format(n))
return n
########################################################################################################################
# Goal: parallel execution of the above task for the following set of params:
########################################################################################################################
# Specify task details
args_iterable = range(5, 15) # launch task with 10 different sets of arguments
is_pure = False # specifiy task purity (https://toolz.readthedocs.io/en/latest/purity.html)
# optional: it enables a finer control of caching
c = Client(processes=False) # setup cluster: scheduler + workers (choose to deploy them either as processes or threads)
# We can also connect to a remotely deployed cluster (if available to us)
pprint(c.scheduler_info()) # print information about the cluster we are currently connecting to
########################################################################################################################
# Solution 1: using the standard concurrent.futures interface (PEP-3148)
########################################################################################################################
# Send tasks
send_mode = 'map'
if send_mode == 'submit':
fut_list = []
for i in args_iterable:
f = c.submit(foo, i, pure=is_pure) # start executing foo(10) immediately, pure functions are cached
fut_list.append(f)
elif send_mode == 'map':
fut_list = c.map(foo, args_iterable, pure=is_pure)
# Receive task results
rec_mode = 'as_completed'
if rec_mode == 'gather':
# Wait for all tasks to finish and gather the results in a list
out = c.gather(fut_list)
print('Result:', out)
elif rec_mode == 'result':
# Wait for the first one of the tasks to finish
out = fut_list[0].result()
print('Result:', out)
elif rec_mode == 'as_completed':
# Retrieve each task result as they finish
for future, out in as_completed(fut_list, with_results=True):
print('Result:', out)
########################################################################################################################
# Solution 2: define a computation DAG by chaining Delayed-type objects
########################################################################################################################
# Build a list of tasks and aggregat them into a parent task object, but do not execute them yet
task_list = []
for i in args_iterable:
task_list.append(delayed(foo, pure=is_pure)(i))
parent = delayed(task_list)
# Execute tasks synchronously (interpreter blocks until all results are computed)
out = parent.compute()
print('Result:', out)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment