Skip to content

Instantly share code, notes, and snippets.

@mrocklin
Created July 13, 2017 13:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mrocklin/a0015dac84fc6a123a7206937ed671ca to your computer and use it in GitHub Desktop.
Save mrocklin/a0015dac84fc6a123a7206937ed671ca to your computer and use it in GitHub Desktop.
""" Tiny example for BNL showing multi-actor control """
from dask.distributed import Client, get_client, Variable, fire_and_forget
import numpy as np
import time
import random
def get_image_from_detector():
""" Collect image from detector, actually just produces random image """
return np.random.random((2000, 2000))
def collect_from_beam():
""" Long running task to collect images from beam continuously """
client = get_client()
while True:
delay = sleep_time.get()
time.sleep(delay)
local_image = get_image_from_detector()
remote_image = client.scatter(local_image, direct=True)
result_1 = client.submit(process_1, remote_image)
result_2 = client.submit(process_2, remote_image)
merged_result = client.submit(process_3, result_1, result_2)
save_raw_image = client.submit(save_to_database, remote_image)
save_final = client.submit(save_to_database, merged_result)
fire_and_forget(save_raw_image) # Make sure to run these tasks
fire_and_forget(save_final) # even if we have moved on locally
# Some generic processing functions
def process_1(img):
time.sleep(random.random() / 2)
return img
def process_2(img):
time.sleep(random.random() / 2)
return img / 10
def process_3(img_1, img_2):
time.sleep(random.random() / 2)
return img_1 + img_2
def save_to_database(img):
time.sleep(0.1)
if __name__ == '__main__':
client = Client('localhost:8786')
sleep_time = Variable()
sleep_time.set(1)
# Long running tasks that feed images into the cluster
futures = [client.submit(collect_from_beam, pure=False, workers='beam-1'),
client.submit(collect_from_beam, pure=False, workers='beam-2')]
@mrocklin
Copy link
Author

To set up local cluster do the following:

dask-scheduler
dask-worker localhost:8786 --name beam-1
dask-worker localhost:8786 --name beam-2
dask-worker localhost:8786 --nprocs 5

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment