Skip to content

Instantly share code, notes, and snippets.

@aeantipov
Created October 21, 2018 17:15
Show Gist options
  • Save aeantipov/6d670e13cd503741e9ef5b0299719a8e to your computer and use it in GitHub Desktop.
Save aeantipov/6d670e13cd503741e9ef5b0299719a8e to your computer and use it in GitHub Desktop.
dask_composite_task test
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"def task(x):\n",
" \"\"\" Sample single-process task that takes between 2 and 5 seconds \"\"\"\n",
" import time\n",
" import random\n",
" dt = random.uniform(2,5)\n",
" time.sleep(dt)\n",
" return x+dt\n",
"\n",
"def composite_task(np=8):\n",
" \"\"\" Composite task that runs multiple single-process runs in parallel \"\"\"\n",
" from functools import partial\n",
" from joblib import Parallel, delayed, parallel_backend\n",
" with parallel_backend('loky', n_jobs=np):\n",
" out=Parallel()(delayed(task)(i) for i in list(range(0, np)))\n",
" return out"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"3.61 s ± 0 ns per loop (mean ± std. dev. of 1 run, 7 loops each)\n"
]
}
],
"source": [
"# Single-cpu task takes 3.5 seconds on average\n",
"%timeit -n7 -r1 task(0)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"5.03 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n"
]
}
],
"source": [
"# This is bounded by the time of the longest task\n",
"%timeit -n1 -r1 composite_task(8)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"from dask.distributed import Client, LocalCluster\n",
"cluster = LocalCluster(n_workers=1, threads_per_worker=8, diagnostics_port = 8788)\n",
"client = Client(cluster)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"25.5 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n"
]
}
],
"source": [
"# This should take the same time as running a single composite task, but it doesn't - GIL is locked \n",
"%timeit -n1 -r1 client.submit(composite_task,8).result()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "bono",
"language": "python",
"name": "bono"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.0"
},
"toc": {
"base_numbering": 1,
"nav_menu": {},
"number_sections": true,
"sideBar": true,
"skip_h1_title": false,
"title_cell": "Table of Contents",
"title_sidebar": "Contents",
"toc_cell": false,
"toc_position": {},
"toc_section_display": true,
"toc_window_display": false
},
"varInspector": {
"cols": {
"lenName": 16,
"lenType": 16,
"lenVar": 40
},
"kernels_config": {
"python": {
"delete_cmd_postfix": "",
"delete_cmd_prefix": "del ",
"library": "var_list.py",
"varRefreshCmd": "print(var_dic_list())"
},
"r": {
"delete_cmd_postfix": ") ",
"delete_cmd_prefix": "rm(",
"library": "var_list.r",
"varRefreshCmd": "cat(var_dic_list()) "
}
},
"types_to_exclude": [
"module",
"function",
"builtin_function_or_method",
"instance",
"_Feature"
],
"window_display": false
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment