Skip to content

Instantly share code, notes, and snippets.

@mrocklin
Created July 15, 2021 21:58
Show Gist options
  • Save mrocklin/0c01c5427358c167a89d5bd34582cd30 to your computer and use it in GitHub Desktop.
Save mrocklin/0c01c5427358c167a89d5bd34582cd30 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create Local Dask Cluster"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import dask\n",
"from dask.distributed import Client\n",
"\n",
"with dask.config.set({\"distributed.worker.daemon\": False}):\n",
" client = Client()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Run disruptive code\n",
"\n",
"This code includes a function that is disruptive by randomly closing the process. \n",
"\n",
"This disrupts Dask by killing the Dask Worker that is in charge of running this function."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import time, random, sys\n",
"\n",
"def load(x):\n",
" time.sleep(random.random())\n",
" return x\n",
"\n",
"def preprocess(x):\n",
" time.sleep(random.random())\n",
" return x\n",
"\n",
"def scary_function(x):\n",
" if random.random() < 0.4: # randomly kill this process\n",
" sys.exit(1)\n",
" else:\n",
" time.sleep(random.random() * 4)\n",
" return x\n",
"\n",
"def save(x):\n",
" time.sleep(random.random())\n",
" return x\n",
"\n",
"data = client.map(load, range(20))\n",
"data = client.map(preprocess, data)\n",
"\n",
"# with dask.annotate(executor=\"processes\", retries=10):\n",
"data = client.map(scary_function, data)\n",
" \n",
"data = client.map(save, data)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Add ProcessPoolExecutor with WorkerPlugins\n",
"\n",
"We can isolate this particular code by running it in a separate ProcessPoolExecutor. This will allow our full workload to complete in a more friendly way."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from dask.distributed import WorkerPlugin\n",
"\n",
"WorkerPlugin?"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# from concurrent.futures import ProcessPoolExecutor\n",
"from loky import ProcessPoolExecutor\n",
"\n",
"class AddProcessPool(WorkerPlugin):\n",
" def setup(self, worker):\n",
" worker.executors[\"processes\"] = ProcessPoolExecutor(max_workers=worker.nthreads)\n",
" \n",
"client.register_worker_plugin(AddProcessPool)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.5"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
@pavithraes
Copy link

This notebook corresponds to the video: Advanced Dask Customization | Customizing Dask Workers | Matt Rocklin.

Note that while executing the cells, you need to create and register the WorkerPlugin before uncommenting with dask.annotate(executor="processes", retries=10): :)

@mrocklin
Copy link
Author

mrocklin commented Jan 6, 2023 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment