Skip to content

Instantly share code, notes, and snippets.

@tacaswell
Forked from mrocklin/beamline2.py
Created August 6, 2017 00:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tacaswell/5ba3dd3ba9be4f35c18fe82d62e25e81 to your computer and use it in GitHub Desktop.
Save tacaswell/5ba3dd3ba9be4f35c18fe82d62e25e81 to your computer and use it in GitHub Desktop.
""" Tiny example for BNL showing multi-actor control """
from dask.distributed import Client, get_client, Variable, fire_and_forget
import numpy as np
import time
import random
def get_image_from_detector():
""" Collect image from detector, actually just produces random image """
return np.random.random((2000, 2000))
def collect_from_beam():
""" Long running task to collect images from beam continuously """
client = get_client()
while True:
delay = sleep_time.get()
time.sleep(delay)
local_image = get_image_from_detector()
remote_image = client.scatter(local_image, direct=True)
result_1 = client.submit(process_1, remote_image)
result_2 = client.submit(process_2, remote_image)
merged_result = client.submit(process_3, result_1, result_2)
save_raw_image = client.submit(save_to_database, remote_image)
save_final = client.submit(save_to_database, merged_result)
fire_and_forget(save_raw_image) # Make sure to run these tasks
fire_and_forget(save_final) # even if we have moved on locally
# Some generic processing functions
def process_1(img):
time.sleep(random.random() / 2)
return img
def process_2(img):
time.sleep(random.random() / 2)
return img / 10
def process_3(img_1, img_2):
time.sleep(random.random() / 2)
return img_1 + img_2
def save_to_database(img):
time.sleep(0.1)
if __name__ == '__main__':
client = Client('localhost:8786')
sleep_time = Variable()
sleep_time.set(1)
# Long running tasks that feed images into the cluster
futures = [client.submit(collect_from_beam, pure=False, workers='beam-1'),
client.submit(collect_from_beam, pure=False, workers='beam-2')]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment