Skip to content

Instantly share code, notes, and snippets.

@devforfu
Last active October 23, 2021 14:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save devforfu/93b082b3f67818602329509c797807da to your computer and use it in GitHub Desktop.
Save devforfu/93b082b3f67818602329509c797807da to your computer and use it in GitHub Desktop.
Shortened Dask example
# Runs a toy image generation task on Dask.
#
# Note that the code was shortened for better readability.
# Please refer the following repository to see the full example:
# https://github.com/devforfu/dask_benchmark
@cli
def main(task_params: TaskParams) -> None:
jobs = [
Job(
identifier=f"job_{i}",
n_images=task_params.random_task_size(),
image_size=512,
output_dir="/tmp/images",
)
for i in range(task_params.n_tasks)
]
for scheduler in DASK_SCHEDULERS:
print(f"{scheduler.ljust(30, '.')}", end="")
with Timer() as timer:
run_task_on_sequence(
jobs,
generate_image,
scheduler,
task_params.num_workers
)
print(str(timer))
def run_task_on_sequence(
seq: List,
task: Callable,
scheduler: str,
num_workers: int
) -> List:
with dask.distributed.Client(n_workers=num_workers) as client:
client.run(init_logger)
bag = db.from_sequence(seq, npartitions=num_workers)
result = bag.map(task).compute(scheduler=scheduler)
return result
def generate_image(job: Job) -> None:
logger = get_logger(filename=f"/tmp/{job.identifier}.log", level=LOGGING_LEVEL)
images = random_images_array(
n_images=job.n_images,
width=job.image_size,
height=job.image_size,
colored=True
)
logger.debug(f"generated {job.n_images} random images")
output_dir = os.path.join(job.output_dir, random_string(10))
logger.debug(f"output dir: {output_dir}")
os.makedirs(output_dir, exist_ok=True)
for i, image in enumerate(images):
filename = os.path.join(output_dir, f"{i}.png")
PIL.Image.fromarray(image).save(filename)
logger.debug(f"saved: {filename}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment