Skip to content

Instantly share code, notes, and snippets.

@rabernat
Last active August 2, 2021 17:46
Show Gist options
  • Save rabernat/39d8b6a396e076d168c24167b8871c4b to your computer and use it in GitHub Desktop.
Save rabernat/39d8b6a396e076d168c24167b8871c4b to your computer and use it in GitHub Desktop.
Test case for dask distributed scheduler
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Xarray / Dask Climatology Benchmark\n",
"\n",
"Notebook designed to debug the issue described in <https://github.com/dask/distributed/issues/2602>\n",
"\n",
"This has been tested with Dask 2020.12.0 and Dask 2021.07.1."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import dask\n",
"dask.__version__"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Dask Cluster Settings"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"nworkers = 30\n",
"worker_memory = 8\n",
"worker_cores = 1\n",
"use_MALLOC_TRIM_THRESHOLD = True"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from dask_gateway import Gateway\n",
"g = Gateway()\n",
"options = g.cluster_options()\n",
"# set the options programatically, or through their HTML repr\n",
"options.worker_memory = worker_memory\n",
"options.worker_cores = worker_cores\n",
"if use_MALLOC_TRIM_THRESHOLD:\n",
" options.environment = {\"MALLOC_TRIM_THRESHOLD_\": \"0\"}\n",
"\n",
"display(options)\n",
"cluster = g.new_cluster(options)\n",
"cluster"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"cluster.scale(nworkers)\n",
"client = cluster.get_client()\n",
"client.wait_for_workers(nworkers)\n",
"client"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Synthetic Data Example 1\n",
"\n",
"Represetntative of computing standard deviation of climatological anomaly."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import dask.array as dsa\n",
"import numpy as np\n",
"import xarray as xr\n",
"\n",
"data = dsa.random.random((10000, 1000000), chunks=(1, 1000000))\n",
"da = xr.DataArray(data, dims=['time', 'x'],\n",
" coords={'day': ('time', np.arange(10000) % 100)})\n",
"clim = da.groupby('day').mean(dim='time')\n",
"anom = da.groupby('day') - clim\n",
"anom_std = anom.std(dim='time')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# without use_MALLOC_TRIM_THRESHOLD, workers die\n",
"# with use_MALLOC_TRIM_THRESHOLD:\n",
"# Dask 2020.12.0: 2min 17s\n",
"# Dask 2021.07.1: 2min 21s\n",
"\n",
"%time anom_std.load()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Synthetic Data Example 2\n",
"\n",
"Representative of calculating forecast skill from an ensemble of weather predictions. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"\n",
"size = (28, 237, 48, 21, 90, 144)\n",
"chunks = (1, 1, 48, 21, 90, 144)\n",
"arr = dsa.random.random(size, chunks=chunks)\n",
"arr\n",
"\n",
"items = dict(\n",
" ensemble = np.arange(size[0]),\n",
" init_date = pd.date_range(start='1960', periods=size[1]),\n",
" lat = np.arange(size[2]).astype(float),\n",
" lead_time = np.arange(size[3]),\n",
" level = np.arange(size[4]).astype(float),\n",
" lon = np.arange(size[5]).astype(float),\n",
")\n",
"dims, coords = zip(*list(items.items()))\n",
"\n",
"array = xr.DataArray(arr, coords=coords, dims=dims)\n",
"dset = xr.Dataset({'data': array})\n",
"display(dset)\n",
"\n",
"result = dset['data'].groupby(\"init_date.month\").mean(dim=[\"init_date\", \"ensemble\"])\n",
"result"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Dask 2020.12.0: 6min 49s\n",
"# Dask 2021.07.1: 4min 13s\n",
"\n",
"%time result.compute();"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"## Real Data Example"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from intake import open_catalog\n",
"cat = open_catalog(\"https://raw.githubusercontent.com/pangeo-data/pangeo-datastore/master/intake-catalogs/ocean.yaml\")\n",
"ds = cat[\"sea_surface_height\"].to_dask()\n",
"ds"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"sla = ds.sla\n",
"sla_gb = sla.groupby('time.dayofyear')\n",
"sla_clim = sla_gb.mean(dim='time')\n",
"sla_anom = sla_gb - sla_clim\n",
"sla_anom_std = sla_anom.std(dim='time')\n",
"sla_anom_std"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# With Dask 2020.12.0: 2min 58s\n",
"# With Dask 2021:07.1: 2min 6s\n",
"\n",
"%time sla_anom_std.load()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"sla_anom_std.plot(figsize=(20, 12))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"client.close()\n",
"cluster.scale(0)\n",
"cluster.close()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"cluster"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.10"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment