Created
April 16, 2017 23:39
-
-
Save golobor/6778ed5dc3d181c7b8a280cbe2717bb3 to your computer and use it in GitHub Desktop.
dask delayed+arrays for multiprocessing in hdf5
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"metadata": { | |
"toc": "true" | |
}, | |
"cell_type": "markdown", | |
"source": "# Table of Contents\n <p>" | |
}, | |
{ | |
"metadata": { | |
"run_control": { | |
"read_only": false, | |
"frozen": false | |
}, | |
"ExecuteTime": { | |
"start_time": "2017-04-16T19:35:50.152405", | |
"end_time": "2017-04-16T19:35:50.157816" | |
}, | |
"trusted": true, | |
"collapsed": true | |
}, | |
"cell_type": "code", | |
"source": "import h5py\nimport numpy as np\n\nimport multiprocessing\n\nimport dask\nimport dask.array as da", | |
"execution_count": 31, | |
"outputs": [] | |
}, | |
{ | |
"metadata": { | |
"ExecuteTime": { | |
"start_time": "2017-04-16T19:35:51.010885", | |
"end_time": "2017-04-16T19:35:51.033935" | |
}, | |
"trusted": true, | |
"collapsed": true | |
}, | |
"cell_type": "code", | |
"source": "def delayed_hdf5(path, dset, lo,hi):\n with h5py.File(path, mode='r',) as f:\n arr = f[dset][lo:hi]\n return arr\n\ndef daskify_hdf5_1d_array_delayed(\n path,\n dset,\n chunksize=int(1e6),\n ):\n \n with h5py.File(path, mode='r') as f:\n size = f[dset].shape[0]\n dtype = f[dset].dtype\n \n chunk_edges = np.arange(0, size, chunksize)\n \n if chunk_edges[-1] != size:\n chunk_edges = np.r_[chunk_edges, size]\n \n arrs = [da.from_delayed(\n dask.delayed(delayed_hdf5)(path, dset, lo, hi),\n (hi-lo,),\n dtype\n )\n for lo,hi in zip(chunk_edges[:-1],chunk_edges[1:])\n ]\n arr = da.concatenate(arrs)\n \n return arr", | |
"execution_count": 32, | |
"outputs": [] | |
}, | |
{ | |
"metadata": { | |
"ExecuteTime": { | |
"start_time": "2017-04-16T19:35:59.434604", | |
"end_time": "2017-04-16T19:36:00.162302" | |
}, | |
"trusted": true, | |
"collapsed": true | |
}, | |
"cell_type": "code", | |
"source": "!rm /tmp/test_dask.h5", | |
"execution_count": 33, | |
"outputs": [] | |
}, | |
{ | |
"metadata": { | |
"ExecuteTime": { | |
"start_time": "2017-04-16T19:36:02.844056", | |
"end_time": "2017-04-16T19:36:33.775437" | |
}, | |
"trusted": true, | |
"collapsed": false | |
}, | |
"cell_type": "code", | |
"source": "N = int(2e8)\n\nwith h5py.File('/tmp/test_dask.h5') as f:\n f.create_dataset('pos1',data=np.random.randint(0, 10000, N),chunks=True)\n f.create_dataset('pos2',data=np.random.randint(0, 10000, N),chunks=True)\n f.create_dataset('counts',data=np.random.randint(0, 10000, N),chunks=True)\n", | |
"execution_count": 34, | |
"outputs": [] | |
}, | |
{ | |
"metadata": { | |
"ExecuteTime": { | |
"start_time": "2017-04-16T19:36:33.777310", | |
"end_time": "2017-04-16T19:36:33.856994" | |
}, | |
"trusted": true, | |
"collapsed": false | |
}, | |
"cell_type": "code", | |
"source": "pos1 = daskify_hdf5_1d_array_delayed('/tmp/test_dask.h5', 'pos1')\npos2 = daskify_hdf5_1d_array_delayed('/tmp/test_dask.h5', 'pos2')\ncounts = daskify_hdf5_1d_array_delayed('/tmp/test_dask.h5', 'counts')", | |
"execution_count": 35, | |
"outputs": [] | |
}, | |
{ | |
"metadata": { | |
"ExecuteTime": { | |
"start_time": "2017-04-16T19:36:33.858629", | |
"end_time": "2017-04-16T19:36:36.811267" | |
}, | |
"trusted": true, | |
"collapsed": false | |
}, | |
"cell_type": "code", | |
"source": "%%time\nwith multiprocessing.Pool(20) as p:\n with dask.set_options(pool=p):\n print(pos1.max().compute())", | |
"execution_count": 36, | |
"outputs": [ | |
{ | |
"output_type": "stream", | |
"text": "9999\nCPU times: user 164 ms, sys: 2.37 s, total: 2.54 s\nWall time: 2.91 s\n", | |
"name": "stdout" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"ExecuteTime": { | |
"start_time": "2017-04-16T19:36:36.813517", | |
"end_time": "2017-04-16T19:36:40.073122" | |
}, | |
"trusted": true, | |
"collapsed": false | |
}, | |
"cell_type": "code", | |
"source": "%%time\nwith multiprocessing.Pool(20) as p:\n with dask.set_options(pool=p):\n print(da.bincount(pos1,minlength=10000).compute())", | |
"execution_count": 37, | |
"outputs": [ | |
{ | |
"output_type": "stream", | |
"text": "[20152 20116 20141 ..., 19753 20105 20095]\nCPU times: user 224 ms, sys: 2.64 s, total: 2.86 s\nWall time: 3.25 s\n", | |
"name": "stdout" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"ExecuteTime": { | |
"start_time": "2017-04-16T19:36:40.074776", | |
"end_time": "2017-04-16T19:37:20.547048" | |
}, | |
"trusted": true, | |
"collapsed": false | |
}, | |
"cell_type": "code", | |
"source": "%%time\nwith multiprocessing.Pool(20) as p:\n with dask.set_options(pool=p):\n print(da.bincount(pos1,counts,minlength=10000).compute())", | |
"execution_count": 38, | |
"outputs": [ | |
{ | |
"output_type": "stream", | |
"text": "[ 1.01605303e+08 1.00903367e+08 1.00353525e+08 ..., 9.82095860e+07\n 1.00779718e+08 1.00266196e+08]\nCPU times: user 7.46 s, sys: 36.8 s, total: 44.2 s\nWall time: 40.5 s\n", | |
"name": "stdout" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"ExecuteTime": { | |
"start_time": "2017-04-16T19:37:20.549004", | |
"end_time": "2017-04-16T19:38:52.273105" | |
}, | |
"trusted": true, | |
"collapsed": false | |
}, | |
"cell_type": "code", | |
"source": "%%time\nwith multiprocessing.Pool(20) as p:\n with dask.set_options(pool=p):\n print((da.bincount(pos1,counts,minlength=10000)\n +da.bincount(pos2,counts,minlength=10000)).compute())", | |
"execution_count": 39, | |
"outputs": [ | |
{ | |
"output_type": "stream", | |
"text": "[ 2.01229651e+08 2.00895087e+08 2.00754355e+08 ..., 1.98273697e+08\n 2.01449863e+08 1.99864165e+08]\nCPU times: user 14.4 s, sys: 1min 24s, total: 1min 39s\nWall time: 1min 31s\n", | |
"name": "stdout" | |
} | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"name": "python3", | |
"display_name": "Python 3", | |
"language": "python" | |
}, | |
"toc": { | |
"threshold": 6, | |
"number_sections": true, | |
"toc_cell": true, | |
"toc_window_display": false, | |
"toc_section_display": "block", | |
"sideBar": false, | |
"navigate_menu": true | |
}, | |
"nav_menu": {}, | |
"language_info": { | |
"name": "python", | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"nbconvert_exporter": "python", | |
"version": "3.5.2", | |
"pygments_lexer": "ipython3", | |
"mimetype": "text/x-python", | |
"file_extension": ".py" | |
}, | |
"gist": { | |
"id": "", | |
"data": { | |
"description": "dask delayed+arrays for multiprocessing in hdf5", | |
"public": true | |
} | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 2 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment