Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
dask delayed+arrays for multiprocessing in hdf5
{
"cells": [
{
"metadata": {
"toc": "true"
},
"cell_type": "markdown",
"source": "# Table of Contents\n <p>"
},
{
"metadata": {
"run_control": {
"read_only": false,
"frozen": false
},
"ExecuteTime": {
"start_time": "2017-04-16T19:35:50.152405",
"end_time": "2017-04-16T19:35:50.157816"
},
"trusted": true,
"collapsed": true
},
"cell_type": "code",
"source": "import h5py\nimport numpy as np\n\nimport multiprocessing\n\nimport dask\nimport dask.array as da",
"execution_count": 31,
"outputs": []
},
{
"metadata": {
"ExecuteTime": {
"start_time": "2017-04-16T19:35:51.010885",
"end_time": "2017-04-16T19:35:51.033935"
},
"trusted": true,
"collapsed": true
},
"cell_type": "code",
"source": "def delayed_hdf5(path, dset, lo,hi):\n with h5py.File(path, mode='r',) as f:\n arr = f[dset][lo:hi]\n return arr\n\ndef daskify_hdf5_1d_array_delayed(\n path,\n dset,\n chunksize=int(1e6),\n ):\n \n with h5py.File(path, mode='r') as f:\n size = f[dset].shape[0]\n dtype = f[dset].dtype\n \n chunk_edges = np.arange(0, size, chunksize)\n \n if chunk_edges[-1] != size:\n chunk_edges = np.r_[chunk_edges, size]\n \n arrs = [da.from_delayed(\n dask.delayed(delayed_hdf5)(path, dset, lo, hi),\n (hi-lo,),\n dtype\n )\n for lo,hi in zip(chunk_edges[:-1],chunk_edges[1:])\n ]\n arr = da.concatenate(arrs)\n \n return arr",
"execution_count": 32,
"outputs": []
},
{
"metadata": {
"ExecuteTime": {
"start_time": "2017-04-16T19:35:59.434604",
"end_time": "2017-04-16T19:36:00.162302"
},
"trusted": true,
"collapsed": true
},
"cell_type": "code",
"source": "!rm /tmp/test_dask.h5",
"execution_count": 33,
"outputs": []
},
{
"metadata": {
"ExecuteTime": {
"start_time": "2017-04-16T19:36:02.844056",
"end_time": "2017-04-16T19:36:33.775437"
},
"trusted": true,
"collapsed": false
},
"cell_type": "code",
"source": "N = int(2e8)\n\nwith h5py.File('/tmp/test_dask.h5') as f:\n f.create_dataset('pos1',data=np.random.randint(0, 10000, N),chunks=True)\n f.create_dataset('pos2',data=np.random.randint(0, 10000, N),chunks=True)\n f.create_dataset('counts',data=np.random.randint(0, 10000, N),chunks=True)\n",
"execution_count": 34,
"outputs": []
},
{
"metadata": {
"ExecuteTime": {
"start_time": "2017-04-16T19:36:33.777310",
"end_time": "2017-04-16T19:36:33.856994"
},
"trusted": true,
"collapsed": false
},
"cell_type": "code",
"source": "pos1 = daskify_hdf5_1d_array_delayed('/tmp/test_dask.h5', 'pos1')\npos2 = daskify_hdf5_1d_array_delayed('/tmp/test_dask.h5', 'pos2')\ncounts = daskify_hdf5_1d_array_delayed('/tmp/test_dask.h5', 'counts')",
"execution_count": 35,
"outputs": []
},
{
"metadata": {
"ExecuteTime": {
"start_time": "2017-04-16T19:36:33.858629",
"end_time": "2017-04-16T19:36:36.811267"
},
"trusted": true,
"collapsed": false
},
"cell_type": "code",
"source": "%%time\nwith multiprocessing.Pool(20) as p:\n with dask.set_options(pool=p):\n print(pos1.max().compute())",
"execution_count": 36,
"outputs": [
{
"output_type": "stream",
"text": "9999\nCPU times: user 164 ms, sys: 2.37 s, total: 2.54 s\nWall time: 2.91 s\n",
"name": "stdout"
}
]
},
{
"metadata": {
"ExecuteTime": {
"start_time": "2017-04-16T19:36:36.813517",
"end_time": "2017-04-16T19:36:40.073122"
},
"trusted": true,
"collapsed": false
},
"cell_type": "code",
"source": "%%time\nwith multiprocessing.Pool(20) as p:\n with dask.set_options(pool=p):\n print(da.bincount(pos1,minlength=10000).compute())",
"execution_count": 37,
"outputs": [
{
"output_type": "stream",
"text": "[20152 20116 20141 ..., 19753 20105 20095]\nCPU times: user 224 ms, sys: 2.64 s, total: 2.86 s\nWall time: 3.25 s\n",
"name": "stdout"
}
]
},
{
"metadata": {
"ExecuteTime": {
"start_time": "2017-04-16T19:36:40.074776",
"end_time": "2017-04-16T19:37:20.547048"
},
"trusted": true,
"collapsed": false
},
"cell_type": "code",
"source": "%%time\nwith multiprocessing.Pool(20) as p:\n with dask.set_options(pool=p):\n print(da.bincount(pos1,counts,minlength=10000).compute())",
"execution_count": 38,
"outputs": [
{
"output_type": "stream",
"text": "[ 1.01605303e+08 1.00903367e+08 1.00353525e+08 ..., 9.82095860e+07\n 1.00779718e+08 1.00266196e+08]\nCPU times: user 7.46 s, sys: 36.8 s, total: 44.2 s\nWall time: 40.5 s\n",
"name": "stdout"
}
]
},
{
"metadata": {
"ExecuteTime": {
"start_time": "2017-04-16T19:37:20.549004",
"end_time": "2017-04-16T19:38:52.273105"
},
"trusted": true,
"collapsed": false
},
"cell_type": "code",
"source": "%%time\nwith multiprocessing.Pool(20) as p:\n with dask.set_options(pool=p):\n print((da.bincount(pos1,counts,minlength=10000)\n +da.bincount(pos2,counts,minlength=10000)).compute())",
"execution_count": 39,
"outputs": [
{
"output_type": "stream",
"text": "[ 2.01229651e+08 2.00895087e+08 2.00754355e+08 ..., 1.98273697e+08\n 2.01449863e+08 1.99864165e+08]\nCPU times: user 14.4 s, sys: 1min 24s, total: 1min 39s\nWall time: 1min 31s\n",
"name": "stdout"
}
]
}
],
"metadata": {
"kernelspec": {
"name": "python3",
"display_name": "Python 3",
"language": "python"
},
"toc": {
"threshold": 6,
"number_sections": true,
"toc_cell": true,
"toc_window_display": false,
"toc_section_display": "block",
"sideBar": false,
"navigate_menu": true
},
"nav_menu": {},
"language_info": {
"name": "python",
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"nbconvert_exporter": "python",
"version": "3.5.2",
"pygments_lexer": "ipython3",
"mimetype": "text/x-python",
"file_extension": ".py"
},
"gist": {
"id": "",
"data": {
"description": "dask delayed+arrays for multiprocessing in hdf5",
"public": true
}
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment