Skip to content

Instantly share code, notes, and snippets.

@rjzamora
Last active December 9, 2019 22:29
Show Gist options
  • Save rjzamora/0ffc35c19b5180ab04bbf7c793c45955 to your computer and use it in GitHub Desktop.
Save rjzamora/0ffc35c19b5180ab04bbf7c793c45955 to your computer and use it in GitHub Desktop.
dask_cudf + UCX merge benchmark
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from dask_cuda import LocalCUDACluster\n",
"from dask_cuda import DGX\n",
"from dask_cuda.initialize import initialize\n",
"from dask.distributed import Client, wait\n",
"from dask.dataframe.shuffle import rearrange_by_column\n",
"from dask.dataframe.core import new_dd_object\n",
"from dask.base import tokenize\n",
"import dask_cudf\n",
"import cudf\n",
"import math\n",
"\n",
"# %load_ext snakeviz\n",
"\n",
"try:\n",
" import cupy as np\n",
"except ImportError:\n",
" import numpy as np"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"# Communication-protocol Options\n",
"use_ucx = True\n",
"enable_tcp_over_ucx = False\n",
"enable_infiniband = True\n",
"enable_nvlink = True\n",
"\n",
"# rmm Options\n",
"use_pool_allocator = True\n",
"\n",
"# Join/merge Options\n",
"chunk_size = 100000000\n",
"frac_match = 0.3\n",
"n_workers = 8"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"distributed.scheduler - INFO - Clear task state\n",
"distributed.scheduler - INFO - Scheduler at: ucx://10.33.225.164:46705\n",
"distributed.scheduler - INFO - dashboard at: 10.33.225.164:8787\n",
"distributed.nanny - INFO - Start Nanny at: 'ucx://10.33.225.164:49397'\n",
"distributed.nanny - INFO - Start Nanny at: 'ucx://10.33.225.164:57623'\n",
"distributed.nanny - INFO - Start Nanny at: 'ucx://10.33.225.164:45469'\n",
"distributed.nanny - INFO - Start Nanny at: 'ucx://10.33.225.164:44671'\n",
"distributed.nanny - INFO - Start Nanny at: 'ucx://10.33.225.164:56123'\n",
"distributed.nanny - INFO - Start Nanny at: 'ucx://10.33.225.164:37973'\n",
"distributed.nanny - INFO - Start Nanny at: 'ucx://10.33.225.164:44485'\n",
"distributed.nanny - INFO - Start Nanny at: 'ucx://10.33.225.164:58089'\n",
"distributed.scheduler - INFO - Register ucx://10.33.225.164:36867\n",
"distributed.scheduler - INFO - Starting worker compute stream, ucx://10.33.225.164:36867\n",
"distributed.core - INFO - Starting established connection\n",
"distributed.scheduler - INFO - Register ucx://10.33.225.164:45741\n",
"distributed.scheduler - INFO - Starting worker compute stream, ucx://10.33.225.164:45741\n",
"distributed.core - INFO - Starting established connection\n",
"distributed.scheduler - INFO - Register ucx://10.33.225.164:35151\n",
"distributed.scheduler - INFO - Starting worker compute stream, ucx://10.33.225.164:35151\n",
"distributed.core - INFO - Starting established connection\n",
"distributed.scheduler - INFO - Register ucx://10.33.225.164:56363\n",
"distributed.scheduler - INFO - Starting worker compute stream, ucx://10.33.225.164:56363\n",
"distributed.core - INFO - Starting established connection\n",
"distributed.scheduler - INFO - Register ucx://10.33.225.164:40253\n",
"distributed.scheduler - INFO - Starting worker compute stream, ucx://10.33.225.164:40253\n",
"distributed.core - INFO - Starting established connection\n",
"distributed.scheduler - INFO - Register ucx://10.33.225.164:55403\n",
"distributed.scheduler - INFO - Starting worker compute stream, ucx://10.33.225.164:55403\n",
"distributed.core - INFO - Starting established connection\n",
"distributed.scheduler - INFO - Register ucx://10.33.225.164:57919\n",
"distributed.scheduler - INFO - Starting worker compute stream, ucx://10.33.225.164:57919\n",
"distributed.core - INFO - Starting established connection\n",
"distributed.scheduler - INFO - Register ucx://10.33.225.164:49409\n",
"distributed.scheduler - INFO - Starting worker compute stream, ucx://10.33.225.164:49409\n",
"distributed.core - INFO - Starting established connection\n",
"distributed.scheduler - INFO - Receive client connection: Client-79a2a57e-1ad2-11ea-88c5-d8c49764f70a\n",
"distributed.core - INFO - Starting established connection\n"
]
},
{
"data": {
"text/html": [
"<table style=\"border: 2px solid white;\">\n",
"<tr>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3 style=\"text-align: left;\">Client</h3>\n",
"<ul style=\"text-align: left; list-style: none; margin: 0; padding: 0;\">\n",
" <li><b>Scheduler: </b>ucx://10.33.225.164:46705</li>\n",
" <li><b>Dashboard: </b><a href='http://10.33.225.164:8787/status' target='_blank'>http://10.33.225.164:8787/status</a>\n",
"</ul>\n",
"</td>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3 style=\"text-align: left;\">Cluster</h3>\n",
"<ul style=\"text-align: left; list-style:none; margin: 0; padding: 0;\">\n",
" <li><b>Workers: </b>8</li>\n",
" <li><b>Cores: </b>8</li>\n",
" <li><b>Memory: </b>1.08 TB</li>\n",
"</ul>\n",
"</td>\n",
"</tr>\n",
"</table>"
],
"text/plain": [
"<Client: 'ucx://10.33.225.164:46705' processes=8 threads=8, memory=1.08 TB>"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Generate `LocalCUDACluster` or `DGX` Cluster\n",
"if use_ucx:\n",
" if enable_tcp_over_ucx:\n",
" interface = \"enp1s0f0\"\n",
" else:\n",
" interface = \"ib0\"\n",
" cluster = DGX(\n",
" n_workers=n_workers,\n",
" interface=interface,\n",
" protocol=\"ucx\",\n",
" enable_tcp_over_ucx=enable_tcp_over_ucx,\n",
" enable_infiniband=enable_infiniband,\n",
" enable_nvlink=enable_nvlink,\n",
" )\n",
" initialize(\n",
" create_cuda_context=True,\n",
" enable_tcp_over_ucx=enable_tcp_over_ucx,\n",
" enable_infiniband=enable_infiniband,\n",
" enable_nvlink=enable_nvlink\n",
" )\n",
"else:\n",
" cluster = LocalCUDACluster(\n",
" n_workers=n_workers,\n",
" device_memory_limit=\"32GB\"\n",
" )\n",
"client = Client(cluster)\n",
"client"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"if use_pool_allocator:\n",
" def set_rmm():\n",
" import cudf\n",
" cudf.set_allocator(\n",
" \"default\",\n",
" pool=True,\n",
" ) # use 1/2\n",
" client.run(set_rmm)\n",
"\n",
"def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match):\n",
" chunk_type = chunk_type or \"build\"\n",
" frac_match = frac_match or 1.0\n",
" if chunk_type == \"build\":\n",
" # Build dataframe\n",
" #\n",
" # \"key\" column is a unique sample within [0, local_size * num_chunks)\n",
" #\n",
" # \"payload\" column is a random permutation of the chunk_size\n",
" \n",
" start = local_size * i_chunk\n",
" stop = start + local_size\n",
" \n",
" parts_array = np.arange(num_chunks, dtype=\"int64\")\n",
" suffle_array = np.repeat(parts_array, math.ceil(local_size / num_chunks))\n",
" \n",
" df = cudf.DataFrame(\n",
" {\n",
" \"key\": np.arange(start, stop=stop, dtype=\"int64\"),\n",
" \"shuffle\": np.random.permutation(suffle_array)[:local_size],\n",
" \"payload\": np.random.permutation(np.arange(local_size, dtype=\"int64\")),\n",
" }\n",
" )\n",
" else:\n",
" # Other dataframe\n",
" #\n",
" # \"key\" column matches values from the build dataframe\n",
" # for a fraction (`frac_match`) of the entries. The matching\n",
" # entries are perfectly balanced across each partition of the\n",
" # \"base\" dataframe.\n",
" #\n",
" # \"payload\" column is a random permutation of the chunk_size\n",
" \n",
" # Step 1. Choose values that DO match\n",
" sub_local_size = local_size // num_chunks\n",
" sub_local_size_use = max(int(sub_local_size * frac_match), 1)\n",
" arrays = []\n",
" for i in range(num_chunks):\n",
" bgn = (local_size * i) + (sub_local_size * i_chunk)\n",
" end = bgn + sub_local_size\n",
" ar = np.arange(bgn, stop=end, dtype=\"int64\")\n",
" arrays.append(np.random.permutation(ar)[:sub_local_size_use])\n",
" key_array_match = np.concatenate(tuple(arrays), axis=0)\n",
" \n",
" # Step 2. Add values that DON'T match\n",
" missing_size = local_size - key_array_match.shape[0]\n",
" start = local_size * num_chunks + local_size * i_chunk\n",
" stop = start + missing_size\n",
" key_array_no_match = np.arange(start, stop=stop, dtype=\"int64\")\n",
" \n",
" # Step 3. Combine and create the fineal dataframe chunk (dask_cudf partition)\n",
" key_array_combine = np.concatenate((key_array_match, key_array_no_match), axis=0)\n",
" df = cudf.DataFrame(\n",
" {\n",
" \"key\": np.random.permutation(key_array_combine),\n",
" \"payload\": np.random.permutation(np.arange(local_size, dtype=\"int64\")),\n",
" }\n",
" )\n",
" return df\n",
"\n",
" \n",
"def get_random_ddf(chunk_size, num_chunks, frac_match, chunk_type):\n",
" \n",
" parts = [chunk_size for i in range(num_chunks)]\n",
" meta = generate_chunk(0, 4, 1, chunk_type, None)\n",
" divisions = [None] * (len(parts) + 1)\n",
" \n",
" name = \"generate-data-\" + tokenize(chunk_size, num_chunks, frac_match, chunk_type)\n",
" \n",
" graph = {\n",
" (name, i): (generate_chunk, i, part, len(parts), chunk_type, frac_match)\n",
" for i, part in enumerate(parts)\n",
" }\n",
" \n",
" ddf = new_dd_object(graph, name, meta, divisions)\n",
" \n",
" if chunk_type == \"build\":\n",
" divisions = [i for i in range(num_chunks)] + [num_chunks]\n",
" return ddf.set_index(\"shuffle\",divisions=tuple(divisions))\n",
"\n",
" return ddf"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"# Generate random Dask dataframes\n",
"ddf_base = get_random_ddf(chunk_size, n_workers, frac_match, \"build\").persist()\n",
"ddf_other = get_random_ddf(chunk_size, n_workers, frac_match, \"other\").persist()"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"DoneAndNotDoneFutures(done={<Future: finished, type: cudf.DataFrame, key: ('generate-data-3393696d3757f92da87d0dddd0c6ae01', 1)>, <Future: finished, type: cudf.DataFrame, key: ('generate-data-3393696d3757f92da87d0dddd0c6ae01', 5)>, <Future: finished, type: cudf.DataFrame, key: ('generate-data-3393696d3757f92da87d0dddd0c6ae01', 6)>, <Future: finished, type: cudf.DataFrame, key: ('generate-data-3393696d3757f92da87d0dddd0c6ae01', 4)>, <Future: finished, type: cudf.DataFrame, key: ('generate-data-3393696d3757f92da87d0dddd0c6ae01', 3)>, <Future: finished, type: cudf.DataFrame, key: ('generate-data-3393696d3757f92da87d0dddd0c6ae01', 2)>, <Future: finished, type: cudf.DataFrame, key: ('generate-data-3393696d3757f92da87d0dddd0c6ae01', 0)>, <Future: finished, type: cudf.DataFrame, key: ('generate-data-3393696d3757f92da87d0dddd0c6ae01', 7)>}, not_done=set())"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Wait on initial dataframes\n",
"wait(ddf_base)\n",
"wait(ddf_other)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"# Define merge/join operation\n",
"ddf_join = ddf_base.merge(ddf_other, on=[\"key\"], how=\"inner\")\n"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"(None, None, None, None, None, None, None, None, None)"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ddf_base.divisions"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"5.33 s ± 2.14 s per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"# Get actual merge time\n",
"%timeit wait(ddf_join.persist())"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print([len(part) for part in ddf_join.partitions])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Result Summary\n",
"\n",
"\n",
"**No UCX [`persist()` Only]**: (machine: `dgx14`)\n",
"```python\n",
"# Communication-protocol Options\n",
"use_ucx = False\n",
"\n",
"# rmm Options\n",
"use_pool_allocator = True\n",
"\n",
"# Join/merge Options\n",
"chunk_size = 100000000\n",
"frac_match = 0.3\n",
"num_chunks = <n_workers>\n",
"```\n",
"\n",
"- 2 workers: `18.1 s ± 15.8 s per loop (mean ± std. dev. of 7 runs, 1 loop each)`\n",
"- 4 workers: `1min 5s ± 40.6 s per loop (mean ± std. dev. of 7 runs, 1 loop each)`\n",
"- 6 workers: `1min 13s ± 47.9 s per loop (mean ± std. dev. of 7 runs, 1 loop each)`\n",
"- 8 workers: `1min 25s ± 36.7 s per loop (mean ± std. dev. of 7 runs, 1 loop each)`\n",
"\n",
"\n",
"**UCX (Infiniband + NVLink) [`persist()` Only]**: (machine: `dgx14`, interface:`\"ib0\"`)\n",
"```python\n",
"# Communication-protocol Options\n",
"use_ucx = True\n",
"enable_tcp_over_ucx = False\n",
"enable_infiniband = True\n",
"enable_nvlink = True\n",
"\n",
"# rmm Options\n",
"use_pool_allocator = True\n",
"\n",
"# Join/merge Options\n",
"chunk_size = 100000000\n",
"frac_match = 0.3\n",
"num_chunks = <n_workers>\n",
"```\n",
"\n",
"- 2 workers: `1.55 s ± 1.76 s per loop (mean ± std. dev. of 7 runs, 1 loop each)`\n",
"- 4 workers: `1.41 s ± 2.16 s per loop (mean ± std. dev. of 7 runs, 1 loop each)`\n",
"- 6 workers: `4.96 s ± 3.29 s per loop (mean ± std. dev. of 7 runs, 1 loop each)`\n",
"- 8 workers: `8.72 s ± 1.76 s per loop (mean ± std. dev. of 7 runs, 1 loop each)`\n",
"\n",
"--\n",
"\n",
"**NOTE**: Results below use full `compute()` operation, which brings everything back to the local process. This causes OOM problems at smaller data sizes, and is not really what we want to benchmark. However, the full rmm pooling comparisons are interesting...\n",
"\n",
"--\n",
"\n",
"**[Full `compute()`] No UCX**: (machine: `dgx14`)\n",
"```python\n",
"# Communication-protocol Options\n",
"use_ucx = False\n",
"\n",
"# Join/merge Options\n",
"chunk_size = 50000000\n",
"frac_match = 0.3\n",
"num_chunks = <n_workers>\n",
"```\n",
"\n",
"- 2 workers: `22.2 s ± 1.22 s per loop (mean ± std. dev. of 7 runs, 1 loop each)`\n",
"- 4 workers: `57.8 s ± 4.24 s per loop (mean ± std. dev. of 7 runs, 1 loop each)`\n",
"- 6 workers: `1min 10s ± 2.88 s per loop (mean ± std. dev. of 7 runs, 1 loop each)`\n",
"- 8 workers: `1min 15s ± 9.7 s per loop (mean ± std. dev. of 7 runs, 1 loop each)`\n",
"\n",
"[*Pool `rmm` Allocator*]\n",
"- 2 workers: `21.3 s ± 2.1 s per loop (mean ± std. dev. of 7 runs, 1 loop each)`\n",
"- 4 workers: `56.9 s ± 5.34 s per loop (mean ± std. dev. of 7 runs, 1 loop each)`\n",
"- 6 workers: `1min 5s ± 6.58 s per loop (mean ± std. dev. of 7 runs, 1 loop each)`\n",
"- 8 workers: `1min 6s ± 9.56 s per loop (mean ± std. dev. of 7 runs, 1 loop each)`\n",
"\n",
"\n",
"\n",
"**[Full `compute()`] UCX (Infiniband + NVLink)**: (machine: `dgx14`, interface:`\"ib0\"`)\n",
"```python\n",
"# Communication-protocol Options\n",
"use_ucx = True\n",
"enable_tcp_over_ucx = False\n",
"enable_infiniband = True\n",
"enable_nvlink = True\n",
"\n",
"# Join/merge Options\n",
"chunk_size = 50000000\n",
"frac_match = 0.3\n",
"num_chunks = <n_workers>\n",
"```\n",
"\n",
"- 2 workers: `5.54 s ± 158 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)`\n",
"- 4 workers: `11.4 s ± 752 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)`\n",
"- 6 workers: `24 s ± 1.75 s per loop (mean ± std. dev. of 7 runs, 1 loop each)`\n",
"- 8 workers: `37.2 s ± 2.49 s per loop (mean ± std. dev. of 7 runs, 1 loop each)`\n",
"\n",
"[*Pool `rmm` Allocator*]\n",
"- 2 workers: `3.24 s ± 178 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)`\n",
"- 4 workers: `7.36 s ± 858 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)`\n",
"- 6 workers: `11.8 s ± 1.59 s per loop (mean ± std. dev. of 7 runs, 1 loop each)`\n",
"- 8 workers: `16.4 s ± 4.2 s per loop (mean ± std. dev. of 7 runs, 1 loop each)`\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
@rjzamora
Copy link
Author

rjzamora commented Nov 27, 2019

Note that @madsbk has done a nice job integrating this benchmark into dask-cuda directly: rapidsai/dask-cuda#183 :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment