Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Dask microbenchmark
#!/usr/bin/env python
from __future__ import absolute_import, division, print_function
import sys
import time
import dask
import numpy as np
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--steps', type=int, required=True)
parser.add_argument('--width', type=int, required=True)
parser.add_argument('--scheduler', required=False)
args = parser.parse_args()
if args.scheduler:
from dask.distributed import Client
client = Client(args.scheduler)
@dask.delayed
def execute_point(*inputs):
return sum(inputs)
@dask.delayed
def join(*args):
pass
def execute_graph(steps, width):
last_row = None
for step in range(steps):
row = []
for point in range(width):
if step > 0:
inputs = last_row[max(point-1, 0):min(point+1, width-1) + 1]
else:
inputs = []
row.append(execute_point(*inputs))
last_row = row
join(*last_row).compute()
start_time = time.perf_counter()
execute_graph(args.steps, args.width)
total_time = time.perf_counter() - start_time
print('Total Time: %f s' % total_time)
print('Time per Task: %f ms' % (total_time/(args.steps * args.width)*1000))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.