Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Dask Scaling Study\n",
"\n",
"We benchmark a suite of dask computations of three types:\n",
"\n",
"1. General task scheduling\n",
"2. Multi-dimensional arrays\n",
"3. Dataframes\n",
"\n",
"In each case we consider computations of varying complexity including computations that are embarrassingly parallel, communication heavy, involve many small communications, or structured in non-trivial ways.\n",
"\n",
"Additionally we consider tasks that are very fast (microseconds) so as to stress the scheduler early as well as tasks that take a more modest amount of time, usually 100ms each.\n",
"\n",
"We perform the same computations in increasing sizes that scale linearly with the cluster size. The cluster is composed of 2-core containers on Google compute engine ranging from 1 container to 256 containers (512 cores total)."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Benchmarking infrastructure"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import math, time\n",
"from dask.distributed import Client, wait"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from distributed.client import default_client\n",
"import pandas as pd\n",
"\n",
"def run(func, client=None):\n",
" client = client or default_client()\n",
" client.restart()\n",
" n = sum(client.ncores().values())\n",
" coroutine = func(n)\n",
"\n",
" name, unit, numerator = next(coroutine)\n",
" out = []\n",
" while True:\n",
" # time.sleep(1)\n",
" start = time.time()\n",
" try:\n",
" next_name, next_unit, next_numerator = next(coroutine)\n",
" except StopIteration:\n",
" break\n",
" finally:\n",
" end = time.time()\n",
" record = {'name': name, \n",
" 'duration': end - start, \n",
" 'unit': unit + '/s', \n",
" 'rate': numerator / (end - start), \n",
" 'n': n,\n",
" 'collection': func.__name__}\n",
" out.append(record)\n",
" name = next_name\n",
" unit = next_unit\n",
" numerator = next_numerator\n",
" return pd.DataFrame(out)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Benchmarks\n",
"\n",
"These functions include a variety of computations for tasks, arrays, and dataframes."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import operator\n",
"import time\n",
"\n",
"def slowinc(x, delay=0.1):\n",
" time.sleep(delay)\n",
" return x + 1\n",
"\n",
"def slowadd(x, y, delay=0.1):\n",
" time.sleep(delay)\n",
" return x + y\n",
"\n",
"def slowsum(L, delay=0.1):\n",
" time.sleep(delay)\n",
" return sum(L)\n",
"\n",
"def inc(x):\n",
" return x + 1\n",
"\n",
"\n",
"def tasks(n):\n",
" yield 'task map fast tasks', 'tasks', n * 200\n",
" \n",
" futures = client.map(inc, range(n * 200))\n",
" wait(futures)\n",
" \n",
" yield 'task map 100ms tasks', 'tasks', n * 100\n",
"\n",
" futures = client.map(slowinc, range(100 * n))\n",
" wait(futures)\n",
" \n",
" yield 'task map 1s tasks', 'tasks', n * 4\n",
"\n",
" futures = client.map(slowinc, range(4 * n), delay=1)\n",
" wait(futures)\n",
"\n",
" yield 'tree reduction fast tasks', 'tasks', 2**7 * n\n",
" \n",
" from dask import delayed\n",
"\n",
" L = range(2**7 * n)\n",
" while len(L) > 1:\n",
" L = list(map(delayed(operator.add), L[0::2], L[1::2]))\n",
"\n",
" L[0].compute()\n",
" \n",
" yield 'tree reduction 100ms tasks', 'tasks', 2**6 * n * 2\n",
" \n",
" from dask import delayed\n",
"\n",
" L = range(2**6 * n)\n",
" while len(L) > 1:\n",
" L = list(map(delayed(slowadd), L[0::2], L[1::2]))\n",
"\n",
" L[0].compute()\n",
" \n",
" yield 'sequential', 'tasks', 100\n",
"\n",
" x = 1\n",
"\n",
" for i in range(100):\n",
" x = delayed(inc)(x)\n",
" \n",
" x.compute()\n",
" \n",
" yield 'dynamic tree reduction fast tasks', 'tasks', 100 * n\n",
" \n",
" from dask.distributed import as_completed\n",
" futures = client.map(inc, range(n * 100))\n",
" \n",
" pool = as_completed(futures)\n",
" batches = pool.batches()\n",
" \n",
" while True:\n",
" try:\n",
" batch = next(batches)\n",
" if len(batch) == 1:\n",
" batch += next(batches)\n",
" except StopIteration:\n",
" break\n",
" future = client.submit(sum, batch)\n",
" pool.add(future)\n",
" \n",
" yield 'dynamic tree reduction 100ms tasks', 'tasks', 100 * n\n",
" \n",
" from dask.distributed import as_completed\n",
" futures = client.map(slowinc, range(n * 20))\n",
" \n",
" pool = as_completed(futures)\n",
" batches = pool.batches()\n",
" \n",
" while True:\n",
" try:\n",
" batch = next(batches)\n",
" if len(batch) == 1:\n",
" batch += next(batches)\n",
" except StopIteration:\n",
" break\n",
" future = client.submit(slowsum, batch)\n",
" pool.add(future)\n",
"\n",
" \n",
" yield 'nearest neighbor fast tasks', 'tasks', 100 * n * 2\n",
" \n",
" L = range(100 * n)\n",
" L = client.map(operator.add, L[:-1], L[1:])\n",
" L = client.map(operator.add, L[:-1], L[1:])\n",
" wait(L)\n",
" \n",
" yield 'nearest neighbor 100ms tasks', 'tasks', 20 * n * 2\n",
" \n",
" L = range(20 * n)\n",
" L = client.map(slowadd, L[:-1], L[1:])\n",
" L = client.map(slowadd, L[:-1], L[1:])\n",
" wait(L)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def arrays(n):\n",
" import dask.array as da\n",
" N = int(5000 * math.sqrt(n))\n",
" x = da.random.randint(0, 10000, size=(N, N), chunks=(2000, 2000))\n",
" \n",
" yield 'create random', 'MB', x.nbytes / 1e6\n",
" \n",
" x = x.persist()\n",
" wait(x)\n",
" \n",
" yield 'blockwise 100ms tasks', 'MB', x.nbytes / 1e6\n",
" \n",
" y = x.map_blocks(slowinc, dtype=x.dtype).persist()\n",
" wait(y)\n",
" \n",
" yield 'random access', 'bytes', 8\n",
" \n",
" x[1234, 4567].compute()\n",
" \n",
" yield 'reduction', 'MB', x.nbytes / 1e6\n",
" \n",
" x.std().compute()\n",
" \n",
" yield 'reduction along axis', 'MB', x.nbytes / 1e6\n",
" \n",
" x.std(axis=0).compute()\n",
" \n",
" yield 'elementwise computation', 'MB', x.nbytes / 1e6\n",
" \n",
" y = da.sin(x) ** 2 + da.cos(x) ** 2\n",
" y = y.persist()\n",
" wait(y) \n",
" \n",
" yield 'rechunk small', 'MB', x.nbytes / 1e6\n",
" \n",
" y = x.rechunk((20000, 200)).persist()\n",
" wait(y)\n",
" \n",
" yield 'rechunk large', 'MB', x.nbytes / 1e6\n",
" \n",
" y = y.rechunk((200, 20000)).persist()\n",
" wait(y)\n",
" \n",
" yield 'transpose addition', 'MB', x.nbytes / 1e6\n",
" y = x + x.T\n",
" y = y.persist()\n",
" wait(y)\n",
" \n",
" yield 'nearest neighbor fast tasks', 'MB', x.nbytes / 1e6\n",
" \n",
" y = x.map_overlap(inc, depth=1).persist()\n",
" wait(y) \n",
" \n",
" yield 'nearest neighbor 100ms tasks', 'MB', x.nbytes / 1e6\n",
" \n",
" y = x.map_overlap(slowinc, depth=1, delay=0.1).persist()\n",
" wait(y) "
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def dataframes(n):\n",
" import dask.array as da\n",
" import dask.dataframe as dd\n",
" N = 2000000 * n\n",
" \n",
" x = da.random.randint(0, 10000, size=(N, 10), chunks=(1000000, 10))\n",
"\n",
" \n",
" yield 'create random', 'MB', x.nbytes / 1e6\n",
" \n",
" df = dd.from_dask_array(x).persist()\n",
" wait(df)\n",
" \n",
" yield 'blockwise 100ms tasks', 'MB', x.nbytes / 1e6\n",
" \n",
" wait(df.map_partitions(slowinc, meta=df).persist())\n",
" \n",
" yield 'arithmetic', 'MB', x.nbytes / 1e6\n",
" \n",
" y = (df[0] + 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + 10).persist()\n",
" wait(y)\n",
" \n",
" yield 'random access', 'bytes', 8\n",
" \n",
" df.loc[123456].compute()\n",
" \n",
" yield 'dataframe reduction', 'MB', x.nbytes / 1e6\n",
" \n",
" df.std().compute()\n",
" \n",
" yield 'series reduction', 'MB', x.nbytes / 1e6 / 10\n",
" \n",
" df[3].std().compute()\n",
" \n",
" yield 'groupby reduction', 'MB', x.nbytes / 1e6\n",
" \n",
" df.groupby(0)[1].mean().compute()\n",
" \n",
" yield 'groupby apply (full shuffle)', 'MB', x.nbytes / 1e6\n",
" \n",
" df.groupby(0).apply(len).compute()\n",
" \n",
" yield 'set index (full shuffle)', 'MB', x.nbytes / 1e6\n",
" \n",
" wait(df.set_index(1).persist())\n",
" \n",
" yield 'rolling aggregations', 'MB', x.nbytes / 1e6\n",
" \n",
" wait(df.rolling(5).mean().persist())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Connect to cluster\n",
"\n",
"We connect to a scheduler on GCE. We start with a single 2-core worker."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<table style=\"border: 2px solid white;\">\n",
"<tr>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3>Client</h3>\n",
"<ul>\n",
" <li><b>Scheduler: </b>tcp://dask-scheduler:8786\n",
" <li><b>Dashboard: </b><a href='http://dask-scheduler:8787' target='_blank'>http://dask-scheduler:8787</a>\n",
"</ul>\n",
"</td>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3>Cluster</h3>\n",
"<ul>\n",
" <li><b>Workers: </b>1</li>\n",
" <li><b>Cores: </b>2</li>\n",
" <li><b>Memory: </b>6.77 GB</li>\n",
"</ul>\n",
"</td>\n",
"</tr>\n",
"</table>"
],
"text/plain": [
"<Client: scheduler='tcp://dask-scheduler:8786' processes=1 cores=2>"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from dask.distributed import Client\n",
"client = Client('dask-scheduler:8786')\n",
"client"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"L = []"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Scale cluster and run computations\n",
"\n",
"We use the [dask-kubernetes](https://github.com/martindurant/dask-kubernetes) command line to scale our cluster up and down manually and run the comptuations above multiple times for each scale. We save these results to Google Cloud Storage. The outputs are publicly available here:\n",
"\n",
"- Raw: https://storage.googleapis.com/dask-data/scaling-data-raw.csv\n",
"- Median: https://storage.googleapis.com/dask-data/scaling-data.csv"
]
},
{
"cell_type": "code",
"execution_count": 98,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<table style=\"border: 2px solid white;\">\n",
"<tr>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3>Client</h3>\n",
"<ul>\n",
" <li><b>Scheduler: </b>tcp://dask-scheduler:8786\n",
" <li><b>Dashboard: </b><a href='http://dask-scheduler:8787' target='_blank'>http://dask-scheduler:8787</a>\n",
"</ul>\n",
"</td>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3>Cluster</h3>\n",
"<ul>\n",
" <li><b>Workers: </b>294</li>\n",
" <li><b>Cores: </b>588</li>\n",
" <li><b>Memory: </b>1991.50 GB</li>\n",
"</ul>\n",
"</td>\n",
"</tr>\n",
"</table>"
],
"text/plain": [
"<Client: scheduler='tcp://dask-scheduler:8786' processes=294 cores=588>"
]
},
"execution_count": 98,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"client"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"0 tasks\n",
"0 arrays\n",
"0 dataframes\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"/work/miniconda/lib/python3.6/site-packages/ipykernel_launcher.py:41: UserWarning: `meta` is not specified, inferred from partial data. Please provide `meta` if the result is unexpected.\n",
" Before: .apply(func)\n",
" After: .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result\n",
" or: .apply(func, meta=('x', 'f8')) for series result\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"1 tasks\n",
"1 arrays\n",
"1 dataframes\n",
"2 tasks\n",
"2 arrays\n",
"2 dataframes\n",
"CPU times: user 9min 1s, sys: 3.69 s, total: 9min 5s\n",
"Wall time: 25min 10s\n"
]
}
],
"source": [
"%%time\n",
"for i in range(3):\n",
" for func in [tasks, arrays, dataframes]:\n",
" print(i, func.__name__)\n",
" df = run(func, client=client)\n",
" L.append(df)"
]
},
{
"cell_type": "code",
"execution_count": 88,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"ddf = pd.concat(L)"
]
},
{
"cell_type": "code",
"execution_count": 89,
"metadata": {
"scrolled": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style>\n",
" .dataframe thead tr:only-child th {\n",
" text-align: right;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: left;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th>duration</th>\n",
" <th>rate</th>\n",
" </tr>\n",
" <tr>\n",
" <th>collection</th>\n",
" <th>name</th>\n",
" <th>n</th>\n",
" <th>unit</th>\n",
" <th></th>\n",
" <th></th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th rowspan=\"30\" valign=\"top\">arrays</th>\n",
" <th rowspan=\"9\" valign=\"top\">blockwise 100ms tasks</th>\n",
" <th>2</th>\n",
" <th>MB/s</th>\n",
" <td>0.930781</td>\n",
" <td>429.738441</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <th>MB/s</th>\n",
" <td>0.847067</td>\n",
" <td>944.435197</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <th>MB/s</th>\n",
" <td>1.053046</td>\n",
" <td>1519.373375</td>\n",
" </tr>\n",
" <tr>\n",
" <th>16</th>\n",
" <th>MB/s</th>\n",
" <td>0.967746</td>\n",
" <td>3306.651083</td>\n",
" </tr>\n",
" <tr>\n",
" <th>32</th>\n",
" <th>MB/s</th>\n",
" <td>1.113454</td>\n",
" <td>5747.768011</td>\n",
" </tr>\n",
" <tr>\n",
" <th>64</th>\n",
" <th>MB/s</th>\n",
" <td>0.967302</td>\n",
" <td>13232.684357</td>\n",
" </tr>\n",
" <tr>\n",
" <th>128</th>\n",
" <th>MB/s</th>\n",
" <td>1.289514</td>\n",
" <td>19852.058765</td>\n",
" </tr>\n",
" <tr>\n",
" <th>256</th>\n",
" <th>MB/s</th>\n",
" <td>1.730550</td>\n",
" <td>29585.964835</td>\n",
" </tr>\n",
" <tr>\n",
" <th>512</th>\n",
" <th>MB/s</th>\n",
" <td>6.030538</td>\n",
" <td>16980.218490</td>\n",
" </tr>\n",
" <tr>\n",
" <th rowspan=\"9\" valign=\"top\">create random</th>\n",
" <th>2</th>\n",
" <th>MB/s</th>\n",
" <td>0.584174</td>\n",
" <td>684.713900</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <th>MB/s</th>\n",
" <td>0.655029</td>\n",
" <td>1221.319419</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <th>MB/s</th>\n",
" <td>0.721304</td>\n",
" <td>2218.161714</td>\n",
" </tr>\n",
" <tr>\n",
" <th>16</th>\n",
" <th>MB/s</th>\n",
" <td>0.664351</td>\n",
" <td>4816.727556</td>\n",
" </tr>\n",
" <tr>\n",
" <th>32</th>\n",
" <th>MB/s</th>\n",
" <td>0.718007</td>\n",
" <td>8913.387113</td>\n",
" </tr>\n",
" <tr>\n",
" <th>64</th>\n",
" <th>MB/s</th>\n",
" <td>0.738002</td>\n",
" <td>17344.136192</td>\n",
" </tr>\n",
" <tr>\n",
" <th>128</th>\n",
" <th>MB/s</th>\n",
" <td>1.200436</td>\n",
" <td>21325.182436</td>\n",
" </tr>\n",
" <tr>\n",
" <th>256</th>\n",
" <th>MB/s</th>\n",
" <td>2.564872</td>\n",
" <td>19962.007739</td>\n",
" </tr>\n",
" <tr>\n",
" <th>512</th>\n",
" <th>MB/s</th>\n",
" <td>1.880598</td>\n",
" <td>54450.695152</td>\n",
" </tr>\n",
" <tr>\n",
" <th rowspan=\"9\" valign=\"top\">elementwise computation</th>\n",
" <th>2</th>\n",
" <th>MB/s</th>\n",
" <td>2.708794</td>\n",
" <td>147.664339</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <th>MB/s</th>\n",
" <td>2.908163</td>\n",
" <td>275.087715</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <th>MB/s</th>\n",
" <td>3.580060</td>\n",
" <td>446.911371</td>\n",
" </tr>\n",
" <tr>\n",
" <th>16</th>\n",
" <th>MB/s</th>\n",
" <td>3.964876</td>\n",
" <td>807.087044</td>\n",
" </tr>\n",
" <tr>\n",
" <th>32</th>\n",
" <th>MB/s</th>\n",
" <td>5.441955</td>\n",
" <td>1176.025319</td>\n",
" </tr>\n",
" <tr>\n",
" <th>64</th>\n",
" <th>MB/s</th>\n",
" <td>7.383476</td>\n",
" <td>1733.600921</td>\n",
" </tr>\n",
" <tr>\n",
" <th>128</th>\n",
" <th>MB/s</th>\n",
" <td>13.116995</td>\n",
" <td>1951.629145</td>\n",
" </tr>\n",
" <tr>\n",
" <th>256</th>\n",
" <th>MB/s</th>\n",
" <td>21.965428</td>\n",
" <td>2330.935700</td>\n",
" </tr>\n",
" <tr>\n",
" <th>512</th>\n",
" <th>MB/s</th>\n",
" <td>42.592467</td>\n",
" <td>2404.177413</td>\n",
" </tr>\n",
" <tr>\n",
" <th rowspan=\"3\" valign=\"top\">nearest neighbor 100ms tasks</th>\n",
" <th>2</th>\n",
" <th>MB/s</th>\n",
" <td>1.236899</td>\n",
" <td>323.383141</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <th>MB/s</th>\n",
" <td>1.160823</td>\n",
" <td>689.166299</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <th>MB/s</th>\n",
" <td>1.509198</td>\n",
" <td>1060.145264</td>\n",
" </tr>\n",
" <tr>\n",
" <th>...</th>\n",
" <th>...</th>\n",
" <th>...</th>\n",
" <th>...</th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th rowspan=\"30\" valign=\"top\">tasks</th>\n",
" <th rowspan=\"3\" valign=\"top\">task map 1s tasks</th>\n",
" <th>128</th>\n",
" <th>tasks/s</th>\n",
" <td>5.590187</td>\n",
" <td>91.589068</td>\n",
" </tr>\n",
" <tr>\n",
" <th>256</th>\n",
" <th>tasks/s</th>\n",
" <td>5.644582</td>\n",
" <td>181.412908</td>\n",
" </tr>\n",
" <tr>\n",
" <th>512</th>\n",
" <th>tasks/s</th>\n",
" <td>7.549867</td>\n",
" <td>271.263068</td>\n",
" </tr>\n",
" <tr>\n",
" <th rowspan=\"9\" valign=\"top\">task map fast tasks</th>\n",
" <th>2</th>\n",
" <th>tasks/s</th>\n",
" <td>0.221133</td>\n",
" <td>1808.870110</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <th>tasks/s</th>\n",
" <td>0.241904</td>\n",
" <td>3307.103271</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <th>tasks/s</th>\n",
" <td>0.486054</td>\n",
" <td>3291.817309</td>\n",
" </tr>\n",
" <tr>\n",
" <th>16</th>\n",
" <th>tasks/s</th>\n",
" <td>0.800579</td>\n",
" <td>3997.107929</td>\n",
" </tr>\n",
" <tr>\n",
" <th>32</th>\n",
" <th>tasks/s</th>\n",
" <td>1.796089</td>\n",
" <td>3563.297941</td>\n",
" </tr>\n",
" <tr>\n",
" <th>64</th>\n",
" <th>tasks/s</th>\n",
" <td>3.346050</td>\n",
" <td>3825.405776</td>\n",
" </tr>\n",
" <tr>\n",
" <th>128</th>\n",
" <th>tasks/s</th>\n",
" <td>6.896783</td>\n",
" <td>3711.875589</td>\n",
" </tr>\n",
" <tr>\n",
" <th>256</th>\n",
" <th>tasks/s</th>\n",
" <td>14.315529</td>\n",
" <td>3576.535636</td>\n",
" </tr>\n",
" <tr>\n",
" <th>512</th>\n",
" <th>tasks/s</th>\n",
" <td>29.077214</td>\n",
" <td>3521.657944</td>\n",
" </tr>\n",
" <tr>\n",
" <th rowspan=\"9\" valign=\"top\">tree reduction 100ms tasks</th>\n",
" <th>2</th>\n",
" <th>tasks/s</th>\n",
" <td>6.856049</td>\n",
" <td>37.339291</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <th>tasks/s</th>\n",
" <td>7.081078</td>\n",
" <td>72.305377</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <th>tasks/s</th>\n",
" <td>7.219373</td>\n",
" <td>141.840563</td>\n",
" </tr>\n",
" <tr>\n",
" <th>16</th>\n",
" <th>tasks/s</th>\n",
" <td>7.429270</td>\n",
" <td>275.666383</td>\n",
" </tr>\n",
" <tr>\n",
" <th>32</th>\n",
" <th>tasks/s</th>\n",
" <td>7.793831</td>\n",
" <td>525.543849</td>\n",
" </tr>\n",
" <tr>\n",
" <th>64</th>\n",
" <th>tasks/s</th>\n",
" <td>8.285013</td>\n",
" <td>988.773376</td>\n",
" </tr>\n",
" <tr>\n",
" <th>128</th>\n",
" <th>tasks/s</th>\n",
" <td>8.970450</td>\n",
" <td>1826.441178</td>\n",
" </tr>\n",
" <tr>\n",
" <th>256</th>\n",
" <th>tasks/s</th>\n",
" <td>12.109690</td>\n",
" <td>2705.932256</td>\n",
" </tr>\n",
" <tr>\n",
" <th>512</th>\n",
" <th>tasks/s</th>\n",
" <td>18.223943</td>\n",
" <td>3596.148117</td>\n",
" </tr>\n",
" <tr>\n",
" <th rowspan=\"9\" valign=\"top\">tree reduction fast tasks</th>\n",
" <th>2</th>\n",
" <th>tasks/s</th>\n",
" <td>0.202223</td>\n",
" <td>1265.927313</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <th>tasks/s</th>\n",
" <td>0.294252</td>\n",
" <td>1740.005646</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <th>tasks/s</th>\n",
" <td>0.505821</td>\n",
" <td>2024.432563</td>\n",
" </tr>\n",
" <tr>\n",
" <th>16</th>\n",
" <th>tasks/s</th>\n",
" <td>0.864661</td>\n",
" <td>2368.557067</td>\n",
" </tr>\n",
" <tr>\n",
" <th>32</th>\n",
" <th>tasks/s</th>\n",
" <td>1.395554</td>\n",
" <td>2935.035983</td>\n",
" </tr>\n",
" <tr>\n",
" <th>64</th>\n",
" <th>tasks/s</th>\n",
" <td>2.916794</td>\n",
" <td>2808.563041</td>\n",
" </tr>\n",
" <tr>\n",
" <th>128</th>\n",
" <th>tasks/s</th>\n",
" <td>5.950359</td>\n",
" <td>2753.447264</td>\n",
" </tr>\n",
" <tr>\n",
" <th>256</th>\n",
" <th>tasks/s</th>\n",
" <td>11.853272</td>\n",
" <td>2764.468701</td>\n",
" </tr>\n",
" <tr>\n",
" <th>512</th>\n",
" <th>tasks/s</th>\n",
" <td>25.379409</td>\n",
" <td>2582.250800</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"<p>279 rows × 2 columns</p>\n",
"</div>"
],
"text/plain": [
" duration rate\n",
"collection name n unit \n",
"arrays blockwise 100ms tasks 2 MB/s 0.930781 429.738441\n",
" 4 MB/s 0.847067 944.435197\n",
" 8 MB/s 1.053046 1519.373375\n",
" 16 MB/s 0.967746 3306.651083\n",
" 32 MB/s 1.113454 5747.768011\n",
" 64 MB/s 0.967302 13232.684357\n",
" 128 MB/s 1.289514 19852.058765\n",
" 256 MB/s 1.730550 29585.964835\n",
" 512 MB/s 6.030538 16980.218490\n",
" create random 2 MB/s 0.584174 684.713900\n",
" 4 MB/s 0.655029 1221.319419\n",
" 8 MB/s 0.721304 2218.161714\n",
" 16 MB/s 0.664351 4816.727556\n",
" 32 MB/s 0.718007 8913.387113\n",
" 64 MB/s 0.738002 17344.136192\n",
" 128 MB/s 1.200436 21325.182436\n",
" 256 MB/s 2.564872 19962.007739\n",
" 512 MB/s 1.880598 54450.695152\n",
" elementwise computation 2 MB/s 2.708794 147.664339\n",
" 4 MB/s 2.908163 275.087715\n",
" 8 MB/s 3.580060 446.911371\n",
" 16 MB/s 3.964876 807.087044\n",
" 32 MB/s 5.441955 1176.025319\n",
" 64 MB/s 7.383476 1733.600921\n",
" 128 MB/s 13.116995 1951.629145\n",
" 256 MB/s 21.965428 2330.935700\n",
" 512 MB/s 42.592467 2404.177413\n",
" nearest neighbor 100ms tasks 2 MB/s 1.236899 323.383141\n",
" 4 MB/s 1.160823 689.166299\n",
" 8 MB/s 1.509198 1060.145264\n",
"... ... ...\n",
"tasks task map 1s tasks 128 tasks/s 5.590187 91.589068\n",
" 256 tasks/s 5.644582 181.412908\n",
" 512 tasks/s 7.549867 271.263068\n",
" task map fast tasks 2 tasks/s 0.221133 1808.870110\n",
" 4 tasks/s 0.241904 3307.103271\n",
" 8 tasks/s 0.486054 3291.817309\n",
" 16 tasks/s 0.800579 3997.107929\n",
" 32 tasks/s 1.796089 3563.297941\n",
" 64 tasks/s 3.346050 3825.405776\n",
" 128 tasks/s 6.896783 3711.875589\n",
" 256 tasks/s 14.315529 3576.535636\n",
" 512 tasks/s 29.077214 3521.657944\n",
" tree reduction 100ms tasks 2 tasks/s 6.856049 37.339291\n",
" 4 tasks/s 7.081078 72.305377\n",
" 8 tasks/s 7.219373 141.840563\n",
" 16 tasks/s 7.429270 275.666383\n",
" 32 tasks/s 7.793831 525.543849\n",
" 64 tasks/s 8.285013 988.773376\n",
" 128 tasks/s 8.970450 1826.441178\n",
" 256 tasks/s 12.109690 2705.932256\n",
" 512 tasks/s 18.223943 3596.148117\n",
" tree reduction fast tasks 2 tasks/s 0.202223 1265.927313\n",
" 4 tasks/s 0.294252 1740.005646\n",
" 8 tasks/s 0.505821 2024.432563\n",
" 16 tasks/s 0.864661 2368.557067\n",
" 32 tasks/s 1.395554 2935.035983\n",
" 64 tasks/s 2.916794 2808.563041\n",
" 128 tasks/s 5.950359 2753.447264\n",
" 256 tasks/s 11.853272 2764.468701\n",
" 512 tasks/s 25.379409 2582.250800\n",
"\n",
"[279 rows x 2 columns]"
]
},
"execution_count": 89,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df = ddf.groupby(['collection', 'name', 'n', 'unit']).median()\n",
"df"
]
},
{
"cell_type": "code",
"execution_count": 90,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df.to_csv('scaling-data.csv')"
]
},
{
"cell_type": "code",
"execution_count": 91,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import gcsfs\n",
"gcs = gcsfs.GCSFileSystem(token='cloud')\n",
"gcs.put('scaling-data.csv', 'dask-data/scaling-data.csv')"
]
},
{
"cell_type": "code",
"execution_count": 92,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"ddf.to_csv('scaling-data-raw.csv')\n",
"gcs.put('scaling-data-raw.csv', 'dask-data/scaling-data-raw.csv')"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.1"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment