Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save hmaarrfk/b0ef570b36267a5e10c81bb0309a318c to your computer and use it in GitHub Desktop.
Save hmaarrfk/b0ef570b36267a5e10c81bb0309a318c to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Dask array\n",
"\n",
"When trying to optimize algorithms with dask, it is tempting to use `dask.array`. Unfortunately, the pickling protocol seems to be a bottlekneck in the performance of `dask.array`. This short notebook provides heuristic evidence as to why that might be the case.\n",
"\n",
"It is believed that a simple solution, much like the solution found for pytorch in this [blog post](https://matthewrocklin.com/blog/work/2018/07/23/protocols-pickle) by @mrocklin might be appropriate for numpy arrays\n",
"\n",
"\n",
"Here is an issue that was raised about this: https://github.com/numpy/numpy/issues/7544\n",
"\n",
"All code is run on an i7-7700HQ, with 4 cores + hyperthreading and 16GB of ram.\n",
"\n",
"By Mark Harfouche, August 2018"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"3.6.6 | packaged by conda-forge | (default, Jul 26 2018, 09:53:17) \n",
"[GCC 4.8.2 20140120 (Red Hat 4.8.2-15)]\n",
"0.19.0\n",
"1.15.0\n"
]
}
],
"source": [
"import dask.array as da\n",
"import numpy as np\n",
"import pickle\n",
"import dask\n",
"import sys\n",
"print(sys.version)\n",
"print(dask.__version__)\n",
"print(np.__version__)"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"shape = (50, 1024, 1024)\n",
"chunks = (shape[0], shape[1]//2, shape[2]//2)\n",
"dtype = 'float32'\n",
"\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 63 µs, sys: 7 µs, total: 70 µs\n",
"Wall time: 115 µs\n"
]
}
],
"source": [
"%%time \n",
"_ = np.zeros(shape=shape, dtype=dtype)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 33.6 ms, sys: 89.8 ms, total: 123 ms\n",
"Wall time: 123 ms\n",
"CPU times: user 33.5 ms, sys: 95.3 ms, total: 129 ms\n",
"Wall time: 127 ms\n",
"CPU times: user 157 ms, sys: 359 ms, total: 516 ms\n",
"Wall time: 703 ms\n"
]
}
],
"source": [
"%time _ = da.zeros(shape=shape, chunks=chunks, dtype=dtype).compute(scheduler='synchronous')\n",
"%time _ = da.zeros(shape=shape, chunks=chunks, dtype=dtype).compute(scheduler='threads')\n",
"%time _ = da.zeros(shape=shape, chunks=chunks, dtype=dtype).compute(scheduler='processes')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"What I think is happening is that pickling is REALLY slow."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 41 µs, sys: 0 ns, total: 41 µs\n",
"Wall time: 43.6 µs\n"
]
}
],
"source": [
"%%time \n",
"pickel_protocol = -1\n",
"a00 = np.zeros(shape=chunks, dtype=dtype)\n",
"a01 = np.zeros(shape=chunks, dtype=dtype)\n",
"a10 = np.zeros(shape=chunks, dtype=dtype)\n",
"a11 = np.zeros(shape=chunks, dtype=dtype)\n"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 88.1 ms, sys: 132 ms, total: 220 ms\n",
"Wall time: 222 ms\n"
]
}
],
"source": [
"%%time\n",
"a00_pickle = pickle.dumps(a00, protocol=pickel_protocol)\n",
"a01_pickle = pickle.dumps(a01, protocol=pickel_protocol)\n",
"a10_pickle = pickle.dumps(a10, protocol=pickel_protocol)\n",
"a11_pickle = pickle.dumps(a11, protocol=pickel_protocol)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 39.6 ms, sys: 51.8 ms, total: 91.4 ms\n",
"Wall time: 90.5 ms\n"
]
}
],
"source": [
"%%time\n",
"a00_rt = pickle.loads(a00_pickle)\n",
"a01_rt = pickle.loads(a01_pickle)\n",
"a10_rt = pickle.loads(a10_pickle)\n",
"a11_rt = pickle.loads(a11_pickle)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 26.7 ms, sys: 55.2 ms, total: 81.8 ms\n",
"Wall time: 81.2 ms\n"
]
}
],
"source": [
"%%time\n",
"c = np.empty(shape=shape, dtype=dtype)\n",
"c[:, :chunks[1], :chunks[2]] = a00_rt\n",
"c[:, :chunks[1], chunks[2]:] = a01_rt\n",
"c[:, chunks[1]:, :chunks[2]] = a10_rt\n",
"c[:, chunks[1]:, chunks[2]:] = a11_rt\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Putting it all together"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 89.8 ms, sys: 225 ms, total: 315 ms\n",
"Wall time: 315 ms\n"
]
}
],
"source": [
"%%time\n",
"a00 = np.zeros(shape=chunks, dtype=dtype)\n",
"a01 = np.zeros(shape=chunks, dtype=dtype)\n",
"a10 = np.zeros(shape=chunks, dtype=dtype)\n",
"a11 = np.zeros(shape=chunks, dtype=dtype)\n",
"a00_pickle = pickle.dumps(a00)\n",
"a01_pickle = pickle.dumps(a01)\n",
"a10_pickle = pickle.dumps(a10)\n",
"a11_pickle = pickle.dumps(a11)\n",
"a00_rt = pickle.loads(a00_pickle)\n",
"a01_rt = pickle.loads(a01_pickle)\n",
"a10_rt = pickle.loads(a10_pickle)\n",
"a11_rt = pickle.loads(a11_pickle)\n"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 28.3 ms, sys: 64.6 ms, total: 92.9 ms\n",
"Wall time: 91.4 ms\n"
]
}
],
"source": [
"%%time\n",
"c = np.empty(shape=shape, dtype=dtype)\n",
"c[:, :chunks[1], :chunks[2]] = a00_rt\n",
"c[:, :chunks[1], chunks[2]:] = a01_rt\n",
"c[:, chunks[1]:, :chunks[2]] = a10_rt\n",
"c[:, chunks[1]:, chunks[2]:] = a11_rt"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This is very suspicious. 400ms / 4 is almost 110 ms, what I think is happening is that dask identifies that `zeros` is a pure function and probably optimizes the four repeated calls to `pickle.dumps` and `pickle.loads` away in the case where we are using `threading` and `sequential` schedulers. Though, maybe I'm wrong."
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 24.1 ms, sys: 64.4 ms, total: 88.5 ms\n",
"Wall time: 88 ms\n"
]
}
],
"source": [
"%%time\n",
"a00 = np.zeros(shape=chunks, dtype=dtype)\n",
"a00_pickle = pickle.dumps(a00, protocol=pickel_protocol)\n",
"a00_rt = pickle.loads(a00_pickle)\n"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 32.4 ms, sys: 63.8 ms, total: 96.2 ms\n",
"Wall time: 95.5 ms\n"
]
}
],
"source": [
"%%time\n",
"c = np.empty(shape=shape, dtype=dtype)\n",
"c[:, :chunks[1], :chunks[2]] = a00_rt\n",
"c[:, :chunks[1], chunks[2]:] = a00_rt\n",
"c[:, chunks[1]:, :chunks[2]] = a00_rt\n",
"c[:, chunks[1]:, chunks[2]:] = a00_rt\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.6"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment