Skip to content

Instantly share code, notes, and snippets.

@whalebot-helmsman
Created March 16, 2020 13:14
Show Gist options
  • Save whalebot-helmsman/a86aa313c550114b096e6a4c1e5da42a to your computer and use it in GitHub Desktop.
Save whalebot-helmsman/a86aa313c550114b096e6a4c1e5da42a to your computer and use it in GitHub Desktop.
from collections import defaultdict
import sys
import dask
from dask.distributed import Client
def s0(x):
print(f's0 {x}')
return x
def s1(x):
print(f's1 {x}')
if x == 1:
raise ValueError('Bad value')
return x + 3
def s2(x):
print(f's2 {x}')
return x * 2
class Chained:
def __init__(self):
self._0 = None
self._1 = None
self._2 = None
def __str__(self):
return '::'.join(self._0, self._1, self._2)
def delayed(client, tasks):
ds0 = dask.delayed(s0)
ds1 = dask.delayed(s1)
ds2 = dask.delayed(s2)
t = defaultdict(Chained)
for i in range(3):
current = t[i]
current._0 = ds0(i)
current._1 = ds1(current._0)
current._2 = ds2(current._1)
t[4]._2 = ds2(t[0]._0)
to_compute = [j._2 for i, j in t.items() if i in tasks or not tasks]
client.compute(to_compute, sync=True)
def futures(client, tasks):
t = defaultdict(Chained)
for i in range(3):
if not tasks or (i in tasks) or (i == 0 and 4 in tasks):
current = t[i]
current._0 = client.submit(s0, i)
current._1 = client.submit(s1, current._0)
current._2 = client.submit(s2, current._1)
if 4 in tasks:
t[4]._2 = client.submit(s2, t[0]._0)
to_compute = [j._2 for i, j in t.items() if j._2 is not None]
client.gather(to_compute, errors='skip')
def main():
client = Client(n_workers=1, threads_per_worker=1)
tasks = [int(i) for i in sys.argv[2:]]
if sys.argv[1] == 'futures':
futures(client, tasks)
elif sys.argv[1] == 'delayed':
delayed(client, tasks)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment