Created
July 15, 2021 21:58
-
-
Save mrocklin/0c01c5427358c167a89d5bd34582cd30 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Create Local Dask Cluster" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import dask\n", | |
"from dask.distributed import Client\n", | |
"\n", | |
"with dask.config.set({\"distributed.worker.daemon\": False}):\n", | |
" client = Client()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Run disruptive code\n", | |
"\n", | |
"This code includes a function that is disruptive by randomly closing the process. \n", | |
"\n", | |
"This disrupts Dask by killing the Dask Worker that is in charge of running this function." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import time, random, sys\n", | |
"\n", | |
"def load(x):\n", | |
" time.sleep(random.random())\n", | |
" return x\n", | |
"\n", | |
"def preprocess(x):\n", | |
" time.sleep(random.random())\n", | |
" return x\n", | |
"\n", | |
"def scary_function(x):\n", | |
" if random.random() < 0.4: # randomly kill this process\n", | |
" sys.exit(1)\n", | |
" else:\n", | |
" time.sleep(random.random() * 4)\n", | |
" return x\n", | |
"\n", | |
"def save(x):\n", | |
" time.sleep(random.random())\n", | |
" return x\n", | |
"\n", | |
"data = client.map(load, range(20))\n", | |
"data = client.map(preprocess, data)\n", | |
"\n", | |
"# with dask.annotate(executor=\"processes\", retries=10):\n", | |
"data = client.map(scary_function, data)\n", | |
" \n", | |
"data = client.map(save, data)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Add ProcessPoolExecutor with WorkerPlugins\n", | |
"\n", | |
"We can isolate this particular code by running it in a separate ProcessPoolExecutor. This will allow our full workload to complete in a more friendly way." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"from dask.distributed import WorkerPlugin\n", | |
"\n", | |
"WorkerPlugin?" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# from concurrent.futures import ProcessPoolExecutor\n", | |
"from loky import ProcessPoolExecutor\n", | |
"\n", | |
"class AddProcessPool(WorkerPlugin):\n", | |
" def setup(self, worker):\n", | |
" worker.executors[\"processes\"] = ProcessPoolExecutor(max_workers=worker.nthreads)\n", | |
" \n", | |
"client.register_worker_plugin(AddProcessPool)" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.8.5" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 4 | |
} |
It looks like since this example was created we stopped allowing types in
the register_worker_plugin method. You'll have to instantiate the
AddProcessPool class before sending it to the workers. Something
like client.register_worker_plugin(AddProcessPool())
…On Fri, Jan 6, 2023 at 1:08 PM Renan Francisco Santos Souza < ***@***.***> wrote:
***@***.**** commented on this gist.
------------------------------
Hi, I tried to run this example as is, but I get error here:
TypeError: AddProcessPool.setup() missing 1 required positional argument:
'self'
I'm trying to access the worker instance inside the WorkerPlugin, like in
the ErrorLogger example
<https://distributed.dask.org/en/stable/plugins.html>:
ts = self.worker.tasks[key]
and I'm struggling with it.
Any advice?
I'm on Dask '2022.12.0'
—
Reply to this email directly, view it on GitHub
<https://gist.github.com/0c01c5427358c167a89d5bd34582cd30#gistcomment-4427362>
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTB4ATKXAGCSFMR6LZLWRCCUHBFKMF2HI4TJMJ2XIZLTSKBKK5TBNR2WLJDHNFZXJJDOMFWWLK3UNBZGKYLEL52HS4DFQKSXMYLMOVS2I5DSOVS2I3TBNVS3W5DIOJSWCZC7OBQXE5DJMNUXAYLOORPWCY3UNF3GS5DZVRZXKYTKMVRXIX3UPFYGLK2HNFZXIQ3PNVWWK3TUUZ2G64DJMNZZDAVEOR4XAZNEM5UXG5FFOZQWY5LFVEYTCMBWHE4DSMRVU52HE2LHM5SXFJTDOJSWC5DF>
.
You are receiving this email because you authored the thread.
Triage notifications on the go with GitHub Mobile for iOS
<https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675>
or Android
<https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub>
.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This notebook corresponds to the video: Advanced Dask Customization | Customizing Dask Workers | Matt Rocklin.
Note that while executing the cells, you need to create and register the
WorkerPlugin
before uncommentingwith dask.annotate(executor="processes", retries=10):
:)