Skip to content

Instantly share code, notes, and snippets.

@rjzamora
Last active July 30, 2019 21:01
Show Gist options
  • Save rjzamora/9935311738f70715d7a459e0b1686145 to your computer and use it in GitHub Desktop.
Save rjzamora/9935311738f70715d7a459e0b1686145 to your computer and use it in GitHub Desktop.
Comparing behavior of cudf and pandas rearrange_by_divisions behavior in Dask
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### **Step 1**: Compare `searchsorted` behavior (`cudf` vs `pandas`)"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/home/nfs/rzamora/miniconda3/envs/cudf_dev_sort/lib/python3.7/site-packages/numba/cuda/envvars.py:16: NumbaDeprecationWarning: \n",
"Environment variables with the 'NUMBAPRO' prefix are deprecated, found use of NUMBAPRO_NVVM=/usr/local/cuda-9.2/nvvm/lib64/libnvvm.so.\n",
"\n",
"For more information visit http://numba.pydata.org/numba-doc/latest/reference/deprecation.html#deprecation-of-numbapro-environment-variables\n",
" warnings.warn(errors.NumbaDeprecationWarning(msg))\n",
"/home/nfs/rzamora/miniconda3/envs/cudf_dev_sort/lib/python3.7/site-packages/numba/cuda/envvars.py:16: NumbaDeprecationWarning: \n",
"Environment variables with the 'NUMBAPRO' prefix are deprecated, found use of NUMBAPRO_LIBDEVICE=/usr/local/cuda-9.2/nvvm/libdevice.\n",
"\n",
"For more information visit http://numba.pydata.org/numba-doc/latest/reference/deprecation.html#deprecation-of-numbapro-environment-variables\n",
" warnings.warn(errors.NumbaDeprecationWarning(msg))\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"<class 'cudf.dataframe.series.Series'>\n",
"<class 'numpy.ndarray'>\n",
"True\n",
"0 0\n",
"1 2\n",
"2 0\n",
"3 2\n",
"4 0\n",
"5 2\n",
"dtype: int32\n"
]
}
],
"source": [
"import cudf, pandas as pd\n",
"import warnings\n",
"import os\n",
"%load_ext snakeviz\n",
"warnings.filterwarnings('ignore')\n",
"\n",
"os.environ[\"CUDA_VISIBLE_DEVICES\"]=\"4,5,6,7\"\n",
"\n",
"# CuDF\n",
"values = cudf.Series([0, 4, 0, 4, 0, 4])\n",
"divisions = [0, 2, 4]\n",
"partitions = cudf.Series(divisions).searchsorted(values, side=\"right\") - 1\n",
"print(type(partitions))\n",
"\n",
"# Pandas\n",
"values_pd = values.to_pandas()\n",
"partitions_pd = pd.Series(divisions).searchsorted(values_pd, side=\"right\") - 1\n",
"print(type(partitions_pd))\n",
"\n",
"# Show CuDF results in len(index) != len(partitions._column)\n",
"print(len(partitions.index) == len(partitions._column))\n",
"\n",
"# Printing result raises error (probably because of index issue):\n",
"print(partitions)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The output of the above cell tells us a few things:\n",
"\n",
"- The `cudf` version of `searchsorted` returns a `cudf.dataframe.series.Series` object\n",
"- The `pandas` version of `searchsorted` returns a `numpy.ndarray` object\n",
"- The `cudf.Series` output has an appropriate index (previously a bug)\n",
"\n",
"#### **Step 2**: GPU `rearrange_by_divisions` Benchmark"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"import cudf, dask.dataframe as dd, pandas as pd\n",
"\n",
"# Make a dataframe, we'd like to divide the data by y\n",
"size = int(1e6) # Dataframe will have this many rows\n",
"\n",
"gdf = cudf.DataFrame({'x': [i for i in range(size)], 'y': [0, 4]*int(size/2)})\n",
"df = pd.DataFrame({'x': [i for i in range(size)], 'y': [0, 4]*int(size/2)})\n",
"\n",
"# Split it up into a few partitions with Dask\n",
"gddf = dd.from_pandas(gdf, npartitions=3)\n",
"ddf = dd.from_pandas(df, npartitions=3)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### **2(a)**: CuDF dataframe with GPU-specific Dask modifications\n",
"\n",
"- Adding `values_cupy` property to `cudf.Series`, which returns the values of the column as a zero-copy cupy array. \n",
"- Adding `rearrange_by_divisions_cupy` and `set_partitions_pre_cupy_series` functions to Dask. These functions ultimately use the `values_cupy` property in `cudf` to avoid moving data to host."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" \n",
"*** Profile stats marshalled to file '/tmp/tmpnt6xbl69'. \n",
"Embedding SnakeViz in this document...\n"
]
},
{
"data": {
"text/html": [
"\n",
"<iframe id='snakeviz-97b692ba-b30c-11e9-9667-d8c49764f624' frameborder=0 seamless width='100%' height='1000'></iframe>\n",
"<script>document.getElementById(\"snakeviz-97b692ba-b30c-11e9-9667-d8c49764f624\").setAttribute(\"src\", \"http://\" + document.location.hostname + \":8080/snakeviz/%2Ftmp%2Ftmpnt6xbl69\")</script>\n"
],
"text/plain": [
"<IPython.core.display.HTML object>"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"## CuDF dataframe with CUPY `values`...\n",
"\n",
"# Try to create a new dataframe with two partitions sorted on the y column, split by y 0->2 and 2->4\n",
"out_cupy = dd.shuffle.rearrange_by_divisions_cupy(\n",
" gddf,\n",
" column='y',\n",
" divisions=cudf.Series([0, 2, 4]),\n",
" meta=cudf.Series([0]),\n",
" shuffle='tasks'\n",
")\n",
"\n",
"# compute in a single thread so it's easy to use %pdb and %debug\n",
"%snakeviz result_cupy = out_cupy.compute(scheduler='single-threaded')\n",
"\n",
"result_cupy = out_cupy.compute(scheduler='single-threaded')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### **2(b)**: Pandas dataframe with Dask *as is*"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" \n",
"*** Profile stats marshalled to file '/tmp/tmp1nvbtqmo'. \n",
"Embedding SnakeViz in this document...\n"
]
},
{
"data": {
"text/html": [
"\n",
"<iframe id='snakeviz-9a068a8e-b30c-11e9-9667-d8c49764f624' frameborder=0 seamless width='100%' height='1000'></iframe>\n",
"<script>document.getElementById(\"snakeviz-9a068a8e-b30c-11e9-9667-d8c49764f624\").setAttribute(\"src\", \"http://\" + document.location.hostname + \":8082/snakeviz/%2Ftmp%2Ftmp1nvbtqmo\")</script>\n"
],
"text/plain": [
"<IPython.core.display.HTML object>"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"## PANDAS dataframe with NUMPY `values`...\n",
"\n",
"# Try to create a new dataframe with t|wo partitions sorted on the y column, split by y 0->2 and 2->4\n",
"out_pd = dd.shuffle.rearrange_by_divisions(ddf, column='y', divisions=[0, 2, 4], shuffle='tasks')\n",
"\n",
"# compute in a single thread so it's easy to use %pdb and %debug\n",
"%snakeviz result_pd = out_pd.compute(scheduler='single-threaded')\n",
"\n",
"result_pd = out_pd.compute(scheduler='single-threaded')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### **2(c)**: CuDF dataframe **without** GPU-specific Dask modifications"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" \n",
"*** Profile stats marshalled to file '/tmp/tmp0se67ae6'. \n",
"Embedding SnakeViz in this document...\n"
]
},
{
"data": {
"text/html": [
"\n",
"<iframe id='snakeviz-9c4b492e-b30c-11e9-9667-d8c49764f624' frameborder=0 seamless width='100%' height='1000'></iframe>\n",
"<script>document.getElementById(\"snakeviz-9c4b492e-b30c-11e9-9667-d8c49764f624\").setAttribute(\"src\", \"http://\" + document.location.hostname + \":8083/snakeviz/%2Ftmp%2Ftmp0se67ae6\")</script>\n"
],
"text/plain": [
"<IPython.core.display.HTML object>"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"## CuDF dataframe with NUMPY `values`...\n",
"\n",
"# Try to create a new dataframe with two partitions sorted on the y column, split by y 0->2 and 2->4\n",
"out_np = dd.shuffle.rearrange_by_divisions(gddf, column='y', divisions=[0, 2, 4], shuffle='tasks')\n",
"\n",
"# compute in a single thread so it's easy to use %pdb and %debug\n",
"%snakeviz result_np = out_np.compute(scheduler='single-threaded')\n",
"\n",
"result_np = out_np.compute(scheduler='single-threaded')"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"True"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from dask.dataframe.utils import assert_eq\n",
"\n",
"assert_eq(result_pd, result_cupy)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"True"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"assert_eq(result_pd, result_np)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment