Skip to content

Instantly share code, notes, and snippets.

@rsignell-usgs
Created May 12, 2018 11:49
Show Gist options
  • Save rsignell-usgs/49aab25d07bbb5e8ade50fa31b443737 to your computer and use it in GitHub Desktop.
Save rsignell-usgs/49aab25d07bbb5e8ade50fa31b443737 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Testing Utide on Pangeo with Dask distributed\n",
"====\n"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"%matplotlib inline\n",
"\n",
"import xarray as xr\n",
"import matplotlib.pyplot as plt\n",
"import s3fs\n",
"\n",
"import utide"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"from dask.distributed import Client, progress, LocalCluster\n",
"from dask_kubernetes import KubeCluster"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"For Dask, the workers need Utide too. So we created `utide-worker.yaml`, which is just the `default-worker.yaml` with the addition of utide from conda-forge.\n",
"```\n",
"env:\n",
" - name: GCSFUSE_BUCKET\n",
" value: pangeo-data\n",
" - name: EXTRA_CONDA_PACKAGES\n",
" value: utide -c conda-forge\n",
"```\n",
"\n",
"Since we are customizing the workers, they take about a minute to spin up...."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"scrolled": true
},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "bdb8e5e0cbbd4d838e599b1102d6ade3",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"VBox(children=(HTML(value='<h2>KubeCluster</h2>'), HBox(children=(HTML(value='\\n<div>\\n <style scoped>\\n .…"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"cluster = KubeCluster.from_yaml('/home/jovyan/worker-template.yaml')\n",
"cluster.scale(10);\n",
"cluster"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<table style=\"border: 2px solid white;\">\n",
"<tr>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3>Client</h3>\n",
"<ul>\n",
" <li><b>Scheduler: </b>tcp://192.168.110.178:36242\n",
" <li><b>Dashboard: </b><a href='/user/rsignell-usgs/proxy/8787/status' target='_blank'>/user/rsignell-usgs/proxy/8787/status</a>\n",
"</ul>\n",
"</td>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3>Cluster</h3>\n",
"<ul>\n",
" <li><b>Workers: </b>10</li>\n",
" <li><b>Cores: </b>10</li>\n",
" <li><b>Memory: </b>37.50 GB</li>\n",
"</ul>\n",
"</td>\n",
"</tr>\n",
"</table>"
],
"text/plain": [
"<Client: scheduler='tcp://192.168.110.178:36242' processes=10 cores=10>"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"client = Client(cluster)\n",
"client"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Open an Xarray Dataset"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"# GCS\n",
"#fs = gcsfs.GCSFileSystem(project='pangeo-181919', access='read_only')\n",
"#fmap = gcsfs.mapping.GCSMap('pangeo-data/rsignell/ocean_his_tide_zeta', gcs=fs)"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"# AWS s3\n",
"fs = s3fs.S3FileSystem(anon=True)\n",
"fmap = s3fs.S3Map('rsignell/tides', s3=fs)\n",
"#fmap = s3fs.S3Map('rsignell/nwm/tiny3a', s3=fs)"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
"ds = xr.open_zarr(fmap, decode_times=False)"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<xarray.Dataset>\n",
"Dimensions: (eta_rho: 324, ocean_time: 1441, xi_rho: 1542)\n",
"Coordinates:\n",
" lat_rho (eta_rho, xi_rho) float64 dask.array<shape=(324, 1542), chunksize=(81, 771)>\n",
" lon_rho (eta_rho, xi_rho) float64 dask.array<shape=(324, 1542), chunksize=(81, 771)>\n",
" * ocean_time (ocean_time) float64 0.0 1.8e+03 3.6e+03 5.4e+03 7.2e+03 ...\n",
"Dimensions without coordinates: eta_rho, xi_rho\n",
"Data variables:\n",
" zeta (ocean_time, eta_rho, xi_rho) float32 dask.array<shape=(1441, 324, 1542), chunksize=(91, 41, 193)>\n",
"Attributes:\n",
" CPP_options: GSB, ADD_FSOBC, ADD_M2OBC, ANA_BSFLUX, ANA_BTFLUX, ANA...\n",
" Conventions: CF-1.4\n",
" NCO: 4.7.3\n",
" NLM_LBC: \\nEDGE: WEST SOUTH EAST NORTH \\nzeta: Cha C...\n",
" ana_file: ROMS/Functionals/ana_btflux.h, ROMS/Functionals/ana_fs...\n",
" code_dir: /cxfs/projects/usgs/hazards/cmgp/woodshole/aaretxabale...\n",
" compiler_command: /opt/intel/impi/5.0.1.035/intel64/bin/mpif90\n",
" compiler_flags: -heap-arrays -fp-model precise -ip -O3 -xW -free\n",
" compiler_system: ifort\n",
" cpu: x86_64\n",
" file: ocean_his_gsb_tides_55nb.nc\n",
" format: netCDF-3 64bit offset file\n",
" frc_file_01: ../forcings/tide_forc_GSB_55.nc\n",
" grd_file: ../grids/GSB_55nb.nc\n",
" header_dir: /cxfs/projects/usgs/hazards/cmgp/woodshole/aaretxabale...\n",
" header_file: gsb.h\n",
" his_file: ocean_his_gsb_tides_55nb.nc\n",
" history: Mon Mar 12 09:36:34 2018: ncks -O -v ocean_time,zeta,l...\n",
" os: Linux\n",
" rst_file: ocean_rst.nc\n",
" script_file: \n",
" svn_rev: \n",
" svn_url: https:://myroms.org/svn/src\n",
" tiling: 036x010\n",
" title: Great south Bay\n",
" type: ROMS/TOMS history file\n",
" var_info: varinfo.dat"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ds"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"1441 324 1542\n"
]
}
],
"source": [
"dt, n, m = ds['zeta'].shape\n",
"print(dt,n,m)"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [],
"source": [
"t = ds['ocean_time'].values/(3600*24)"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [],
"source": [
"#client.get_versions(check=True)"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [],
"source": [
"from utide import solve\n",
"import numpy as np\n",
"import warnings\n",
"\n",
"lat = 40.7"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"solve: matrix prep ... solution ... diagnostics ... done.\n",
"CPU times: user 312 ms, sys: 188 ms, total: 500 ms\n",
"Wall time: 6.94 s\n"
]
}
],
"source": [
"%%time\n",
"with warnings.catch_warnings():\n",
" warnings.simplefilter(\"ignore\")\n",
" acoef = solve(t, ds['zeta'][:,10,10].values, 0*t, lat, trend=False, \n",
" nodal=False, Rayleigh_min=0.95, method='ols', conf_int='linear')\n",
" val = acoef['Lsmaj']"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"array(['M2', 'N2', 'S2', 'K1', 'O1', 'ETA2', 'M6', 'M4', 'Q1', 'UPS1',\n",
" 'OO1', 'J1', 'NO1', '2Q1', 'MO3', 'M3', 'SK3', 'MK3', 'MN4', 'MSF',\n",
" 'MS4', 'S4', '2MK5', '2SK5', '3MK7', '2MN6', '2MS6', '2SM6', 'M8'],\n",
" dtype=object)"
]
},
"execution_count": 21,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"acoef['name']"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [],
"source": [
"import dask.array as da\n",
"from dask import delayed"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Make Utide `solve` a delayed function"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [],
"source": [
"usolve = delayed(solve)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Load all the data to start, since it's only 2GB"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2.879740512\n",
"CPU times: user 4.12 s, sys: 7.87 s, total: 12 s\n",
"Wall time: 21.7 s\n"
]
}
],
"source": [
"%%time\n",
"z = ds['zeta'][:].compute()\n",
"print(z.nbytes/1e9)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Specify the subset interval to calcuate tides. nsub = 4 means do every 4th point"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [],
"source": [
"nsub = 4"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create a list of delayed functions (one for each grid cell)"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 14 s, sys: 324 ms, total: 14.3 s\n",
"Wall time: 14.1 s\n"
]
}
],
"source": [
"%%time \n",
"with warnings.catch_warnings():\n",
" warnings.simplefilter(\"ignore\")\n",
" coefs = [usolve(t, z[:,j*nsub,i*nsub], t*0.0, lat, \n",
" trend=False, nodal=False, Rayleigh_min=0.95, method='ols',\n",
" conf_int='linear') for j in range(int(n/nsub)) for i in range(int(m/nsub))]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Construct a list of Dask arrays"
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {},
"outputs": [],
"source": [
"arrays = [da.from_delayed(coef['Lsmaj'], dtype=val.dtype, shape=val.shape) for coef in coefs]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Stack the arrays "
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [],
"source": [
"stack = da.stack(arrays, axis=0) "
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"dask.array<stack, shape=(31185, 29), dtype=float64, chunksize=(1, 29)>"
]
},
"execution_count": 29,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"stack"
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 2min 38s, sys: 21.5 s, total: 3min\n",
"Wall time: 4min 50s\n"
]
}
],
"source": [
"%%time \n",
"amps = stack.compute()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# for debugging\n",
"#pods = cluster.pods()\n",
"#print(cluster.logs(pods[0]))"
]
},
{
"cell_type": "code",
"execution_count": 39,
"metadata": {},
"outputs": [],
"source": [
"m2amp = amps[:,0].reshape((int(n/nsub),int(m/nsub)))"
]
},
{
"cell_type": "code",
"execution_count": 40,
"metadata": {},
"outputs": [],
"source": [
"%matplotlib inline\n",
"import matplotlib.pyplot as plt"
]
},
{
"cell_type": "code",
"execution_count": 41,
"metadata": {},
"outputs": [
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 864x576 with 2 Axes>"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"plt.figure(figsize=(12,8))\n",
"plt.pcolormesh(m2amp)\n",
"plt.colorbar()\n",
"plt.title('M2 Elevation Amplitude');"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python [conda root]",
"language": "python",
"name": "conda-root-py"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.4"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment