Skip to content

Instantly share code, notes, and snippets.

@willirath
Created May 3, 2018 09:48
Show Gist options
  • Save willirath/fbc643866bf7f5f77d5becd3b13a01b1 to your computer and use it in GitHub Desktop.
Save willirath/fbc643866bf7f5f77d5becd3b13a01b1 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Parallelizing Parcels with Dask\n",
"\n",
"This will show how to distribute Parcels execution (using the Brownian motion example) to a dask cluster."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Prerequisites\n",
"\n",
"Install Parcels and dask:\n",
"\n",
"```bash\n",
"conda create -n py3_parcels_dask -c conda-forge python=3 parcels dask distributed bokeh\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## First, reproduce the brownian motion example\n",
"\n",
"(Note that we pass the random seed as a keyword arg.)"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from parcels import FieldSet, Field, ParticleSet, ScipyParticle, JITParticle, BrownianMotion2D\n",
"import numpy as np\n",
"from datetime import timedelta as delta\n",
"from parcels import random"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"ptype = {'scipy': ScipyParticle, 'jit': JITParticle}"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"def zeros_fieldset(xdim=2, ydim=2):\n",
" \"\"\"Generates a zero velocity field\"\"\"\n",
" lon = np.linspace(-20, 20, xdim, dtype=np.float32)\n",
" lat = np.linspace(-20, 20, ydim, dtype=np.float32)\n",
"\n",
" dimensions = {'lon': lon, 'lat': lat}\n",
" data = {'U': np.zeros((ydim, xdim), dtype=np.float32),\n",
" 'V': np.zeros((ydim, xdim), dtype=np.float32)}\n",
" return FieldSet.from_data(data, dimensions, mesh='spherical')"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"def run_brownian_example(mode='scipy', npart=3000, random_seed=12345, runtime=delta(days=1)):\n",
" fieldset = zeros_fieldset()\n",
"\n",
" # Set diffusion constants.\n",
" kh_zonal = 100\n",
" kh_meridional = 100\n",
"\n",
" # Create field of Kh_zonal and Kh_meridional, using same grid as U\n",
" grid = fieldset.U.grid\n",
" fieldset.add_field(Field('Kh_zonal', kh_zonal * np.ones((2, 2)),\n",
" grid=grid))\n",
" fieldset.add_field(Field('Kh_meridional', kh_meridional * np.ones((2, 2)),\n",
" grid=grid))\n",
"\n",
" # Set random seed\n",
" random.seed(random_seed)\n",
" \n",
" pset = ParticleSet(fieldset=fieldset, pclass=ptype[mode],\n",
" lon=np.zeros(npart), lat=np.zeros(npart))\n",
" pset.execute(pset.Kernel(BrownianMotion2D),\n",
" runtime=runtime, dt=delta(hours=1))\n",
"\n",
" return pset"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Run the default setting once."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"WARNING: Casting field data to np.float32\n",
"INFO: Compiled random ==> /tmp/parcels-1000/random.so\n",
"INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/8bab2818c41e5b898bc8af669c861c89.so\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 748 ms, sys: 28 ms, total: 776 ms\n",
"Wall time: 1.88 s\n"
]
}
],
"source": [
"%%time\n",
"pset = run_brownian_example(mode='jit')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Next step (no dask yet): Create a list of realizations"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create a list of seeds and map `run_brownian_motion` to it."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[0 1 2 3]\n"
]
}
],
"source": [
"seeds = np.arange(4)\n",
"print(seeds)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/74d9a05b0d6153dad987f4ba8db53401.so\n",
"INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/4efabbfec368a792a3d82d635b2bca44.so\n",
"INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/330323f6a53c823ba595ac51dfb0cc89.so\n",
"INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/c85aaaae4e4face49067612083956ad8.so\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 2.25 s, sys: 40 ms, total: 2.29 s\n",
"Wall time: 5.85 s\n"
]
}
],
"source": [
"%%time\n",
"list_of_psets = list(map(lambda s: run_brownian_example(mode='jit', random_seed=s),\n",
" seeds))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Now, execute Parcels for a list of realizations using a local dask cluster"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"import dask.bag as db\n",
"from dask.distributed import Client"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"client = Client(n_workers=2, threads_per_worker=2, processes=False)"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<table style=\"border: 2px solid white;\">\n",
"<tr>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3>Client</h3>\n",
"<ul>\n",
" <li><b>Scheduler: </b>inproc://134.245.217.57/19523/1\n",
" <li><b>Dashboard: </b><a href='http://localhost:8787/status' target='_blank'>http://localhost:8787/status</a>\n",
"</ul>\n",
"</td>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3>Cluster</h3>\n",
"<ul>\n",
" <li><b>Workers: </b>2</li>\n",
" <li><b>Cores: </b>4</li>\n",
" <li><b>Memory: </b>21.06 GB</li>\n",
"</ul>\n",
"</td>\n",
"</tr>\n",
"</table>"
],
"text/plain": [
"<Client: scheduler='inproc://134.245.217.57/19523/1' processes=2 cores=4>"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"client"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"_Make sure to **click on the link above!**_"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"seeds_bag = db.from_sequence(seeds, partition_size=1)\n",
"psets_jit_bag = db.map(lambda s: run_brownian_example(mode='jit', random_seed=s),\n",
" seeds_bag)"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/dee94dfa7edd708cf3eee30f83f87732.so\n",
"INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/ebe079b32cad3e55cfbbf5a2a80c33f2.so\n",
"INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/db1158fbc8444ce392fb212e0bbe7ffe.so\n",
"INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/598b1ddd22d171d9594f803f5e0b48dc.so\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 2.93 s, sys: 124 ms, total: 3.05 s\n",
"Wall time: 3.69 s\n"
]
}
],
"source": [
"%%time\n",
"psets_jit = psets_jit_bag.compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Some longer integration"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [],
"source": [
"psets_jit_long_bag = db.map(lambda s: run_brownian_example(mode='jit', random_seed=s,\n",
" runtime=delta(days=10),\n",
" npart=int(1e5)),\n",
" seeds_bag)"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/6225d33ce72a0ac407d98209270dff3f.so\n",
"INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/3c3b97f9a2d4e6cfdda9911f46fab3c9.so\n",
"INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/df8d25e1c966f0fdc79fb89cc8e0e9f8.so\n",
"INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/352ebd0c80d39bda328b771ca01ba454.so\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 2min 44s, sys: 1min 6s, total: 3min 51s\n",
"Wall time: 2min 11s\n"
]
}
],
"source": [
"%%time\n",
"psets_jit_long = psets_jit_long_bag.compute();"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/c18eedd1070e8f2be92728c96858b77b.so\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 20.3 s, sys: 272 ms, total: 20.6 s\n",
"Wall time: 20.4 s\n"
]
}
],
"source": [
"%%time\n",
"pset = run_brownian_example(mode='jit', random_seed=0,\n",
" runtime=delta(days=10), npart=int(1e5));"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/52d6a1186ee413ffc8aa48dda45d74fe.so\n",
"INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/2605570d8ae634bc878cedec40dd07b3.so\n",
"INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/780253500d442926130d806a0d0e766c.so\n",
"INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/671a3cb47ad9a8e167875b5c56736508.so\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 1min 20s, sys: 2.17 s, total: 1min 22s\n",
"Wall time: 1min 21s\n"
]
}
],
"source": [
"%%time\n",
"list_of_psets_long = list(map(lambda s: run_brownian_example(mode='jit', random_seed=s,\n",
" runtime=delta(days=10),\n",
" npart=int(1e5)),\n",
" seeds))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Discussion\n",
"\n",
"- Note how we set `processes=False` in creation of the dask cluster. Running different processes fails because the ctypes used in Parcels cannot be pickled. (Test by re-running with `processes=True`.)\n",
"\n",
"- For the brownian motion example, we cannot circumvent this pickling issue by using Scipy particles, because `parcels.random` seems to imply some compilation.\n",
"\n",
"- Note that the approach outlined here could also be achived with Python's own `multiprocessing` toolbox. But as there, inter-process communication is done by pickling name spaces as well, we're down to making sure that the C types can be pickled again. On the up-side, getting Parcels to work with dask very likely also paves the way for many other (and still to come?) distributed computing tools that rely on some serialization for inter-worker communication."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Next steps\n",
"\n",
"- _**How does this scale?**_ — We'd probably like to allow for much longer run times and some machine larger than my desktop computer.\n",
"\n",
"- _**How to split tasks across dask resources?**_ — The obvious way is to go for splitting a particle set across different workers and re-merge them after the execution."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Boilerplate for the sake of (some) reproducibility"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"# packages in environment at /home/wrath/miniconda3/envs/py3_parcels_dask:\n",
"#\n",
"appdirs 1.4.3 py_0 conda-forge\n",
"attrs 17.4.0 py_0 conda-forge\n",
"backcall 0.1.0 py_0 conda-forge\n",
"binutils_impl_linux-64 2.28.1 had2808c_3 \n",
"binutils_linux-64 7.2.0 26 \n",
"bleach 2.1.3 py_0 conda-forge\n",
"bokeh 0.12.15 py36_0 conda-forge\n",
"bottleneck 1.2.1 py36_1 conda-forge\n",
"bzip2 1.0.6 1 conda-forge\n",
"ca-certificates 2018.4.16 0 conda-forge\n",
"cachetools 2.0.1 py_0 conda-forge\n",
"certifi 2018.4.16 py36_0 conda-forge\n",
"cgen 2017.1 py36_0 conda-forge\n",
"click 6.7 py_1 conda-forge\n",
"cloudpickle 0.5.2 py_0 conda-forge\n",
"curl 7.59.0 1 conda-forge\n",
"cycler 0.10.0 py36_0 conda-forge\n",
"cytoolz 0.9.0.1 py36_0 conda-forge\n",
"dask 0.17.3 py_0 conda-forge\n",
"dask-core 0.17.3 py_0 conda-forge\n",
"dbus 1.11.0 0 conda-forge\n",
"decorator 4.3.0 py_0 conda-forge\n",
"distributed 1.21.7 py36_0 conda-forge\n",
"entrypoints 0.2.3 py36_1 conda-forge\n",
"expat 2.2.5 0 conda-forge\n",
"ffmpeg 3.2.4 3 conda-forge\n",
"fontconfig 2.12.6 0 conda-forge\n",
"freetype 2.8.1 0 conda-forge\n",
"gcc_impl_linux-64 7.2.0 habb00fd_3 \n",
"gcc_linux-64 7.2.0 26 \n",
"gettext 0.19.8.1 0 conda-forge\n",
"glib 2.55.0 0 conda-forge\n",
"gmp 6.1.2 0 conda-forge\n",
"gst-plugins-base 1.8.0 0 conda-forge\n",
"gstreamer 1.8.0 1 conda-forge\n",
"h5netcdf 0.5.1 py_0 conda-forge\n",
"h5py 2.7.1 py36_2 conda-forge\n",
"hdf4 4.2.13 0 conda-forge\n",
"hdf5 1.10.1 2 conda-forge\n",
"heapdict 1.0.0 py36_0 conda-forge\n",
"html5lib 1.0.1 py_0 conda-forge\n",
"icu 58.2 0 conda-forge\n",
"intel-openmp 2018.0.0 8 \n",
"ipykernel 4.8.2 py36_0 conda-forge\n",
"ipython 6.3.1 py36_0 conda-forge\n",
"ipython_genutils 0.2.0 py36_0 conda-forge\n",
"jedi 0.12.0 py36_0 conda-forge\n",
"jinja2 2.10 py36_0 conda-forge\n",
"jpeg 9b 2 conda-forge\n",
"jsonschema 2.6.0 py36_1 conda-forge\n",
"jupyter_client 5.2.3 py36_0 conda-forge\n",
"jupyter_core 4.4.0 py_0 conda-forge\n",
"jupyterlab 0.32.1 py36_0 conda-forge\n",
"jupyterlab_launcher 0.10.5 py36_0 conda-forge\n",
"krb5 1.14.6 0 conda-forge\n",
"libffi 3.2.1 3 conda-forge\n",
"libgcc 7.2.0 h69d50b8_2 \n",
"libgcc-ng 7.2.0 hdf63c60_3 \n",
"libgfortran 3.0.0 1 \n",
"libgfortran-ng 7.2.0 hdf63c60_3 \n",
"libiconv 1.15 0 conda-forge\n",
"libnetcdf 4.6.1 2 conda-forge\n",
"libpng 1.6.34 0 conda-forge\n",
"libsodium 1.0.16 0 conda-forge\n",
"libssh2 1.8.0 2 conda-forge\n",
"libstdcxx-ng 7.2.0 hdf63c60_3 \n",
"libxcb 1.13 0 conda-forge\n",
"libxml2 2.9.8 0 conda-forge\n",
"locket 0.2.0 py36_1 conda-forge\n",
"markupsafe 1.0 py36_0 conda-forge\n",
"matplotlib 2.0.2 py36h2acb4ad_1 \n",
"mistune 0.8.3 py36_1 conda-forge\n",
"mkl 2018.0.2 1 \n",
"mkl_fft 1.0.2 py36_0 conda-forge\n",
"mkl_random 1.0.1 py36_0 conda-forge\n",
"more-itertools 4.1.0 py_0 conda-forge\n",
"msgpack-python 0.5.6 py36_0 conda-forge\n",
"nbconvert 5.3.1 py_1 conda-forge\n",
"nbformat 4.4.0 py36_0 conda-forge\n",
"ncurses 5.9 10 conda-forge\n",
"netcdf4 1.3.1 py36_3 conda-forge\n",
"notebook 5.4.1 py36_0 conda-forge\n",
"numpy 1.14.2 py36hdbf6ddf_1 \n",
"openssl 1.0.2o 0 conda-forge\n",
"packaging 17.1 py_0 conda-forge\n",
"pandas 0.22.0 py36_1 conda-forge\n",
"pandoc 2.2 0 conda-forge\n",
"pandocfilters 1.4.2 py36_0 conda-forge\n",
"parcels 1.0.2 py36_0 conda-forge\n",
"parso 0.2.0 py_0 conda-forge\n",
"partd 0.3.8 py36_0 conda-forge\n",
"pcre 8.41 1 conda-forge\n",
"pexpect 4.5.0 py36_0 conda-forge\n",
"pickleshare 0.7.4 py36_0 conda-forge\n",
"pip 9.0.3 py36_0 conda-forge\n",
"pluggy 0.6.0 py_0 conda-forge\n",
"progressbar2 3.37.0 py36_0 conda-forge\n",
"prompt_toolkit 1.0.15 py36_0 conda-forge\n",
"psutil 5.4.5 py36_0 conda-forge\n",
"ptyprocess 0.5.2 py36_0 conda-forge\n",
"py 1.5.3 py_0 conda-forge\n",
"pygments 2.2.0 py36_0 conda-forge\n",
"pymbolic 2017.1 py36_0 conda-forge\n",
"pyparsing 2.2.0 py36_0 conda-forge\n",
"pyqt 5.6.0 py36_5 conda-forge\n",
"pytest 3.5.1 py36_0 conda-forge\n",
"pytest-runner 4.2 py_0 conda-forge\n",
"python 3.6.5 1 conda-forge\n",
"python-dateutil 2.7.2 py_0 conda-forge\n",
"python-utils 2.3.0 py36_0 conda-forge\n",
"pytools 2018.4 py_0 conda-forge\n",
"pytz 2018.4 py_0 conda-forge\n",
"pyyaml 3.12 py36_1 conda-forge\n",
"pyzmq 17.0.0 py36_4 conda-forge\n",
"qt 5.6.2 7 conda-forge\n",
"readline 7.0 0 conda-forge\n",
"scipy 1.0.1 py36hfc37229_0 \n",
"send2trash 1.5.0 py_0 conda-forge\n",
"setuptools 39.1.0 py36_0 conda-forge\n",
"simplegeneric 0.8.1 py36_0 conda-forge\n",
"sip 4.18 py36_1 conda-forge\n",
"six 1.11.0 py36_1 conda-forge\n",
"sortedcontainers 1.5.10 py36_0 conda-forge\n",
"sqlite 3.20.1 2 conda-forge\n",
"tblib 1.3.2 py36_0 conda-forge\n",
"terminado 0.8.1 py36_0 conda-forge\n",
"testpath 0.3.1 py36_0 conda-forge\n",
"tk 8.6.7 0 conda-forge\n",
"toolz 0.9.0 py_0 conda-forge\n",
"tornado 5.0.2 py36_0 conda-forge\n",
"traitlets 4.3.2 py36_0 conda-forge\n",
"wcwidth 0.1.7 py36_0 conda-forge\n",
"webencodings 0.5 py36_0 conda-forge\n",
"wheel 0.31.0 py36_0 conda-forge\n",
"x264 20131218 0 conda-forge\n",
"xarray 0.10.3 py36_0 conda-forge\n",
"xorg-libxau 1.0.8 3 conda-forge\n",
"xorg-libxdmcp 1.1.2 3 conda-forge\n",
"xz 5.2.3 0 conda-forge\n",
"yaml 0.1.7 0 conda-forge\n",
"zeromq 4.2.5 1 conda-forge\n",
"zict 0.1.3 py_0 conda-forge\n",
"zlib 1.2.11 0 conda-forge\n"
]
}
],
"source": [
"!conda list"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment