Skip to content

Instantly share code, notes, and snippets.

@mrocklin
Created January 24, 2019 22:41
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mrocklin/6e2c33c33b32bc324e3965212f202f66 to your computer and use it in GitHub Desktop.
Save mrocklin/6e2c33c33b32bc324e3965212f202f66 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Dask Dataframe with cuDF joins\n",
"\n",
"This shows using Dask DataFrame with cuDF on an eight-GPU machine.\n",
"\n",
"This makes three points:\n",
"\n",
"1. Joins work\n",
"2. They're slow due to communication\n",
"3. Agnostic Pandas/cuDF workflows provide usability gains"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Use a DGX"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<table style=\"border: 2px solid white;\">\n",
"<tr>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3>Client</h3>\n",
"<ul>\n",
" <li><b>Scheduler: </b>tcp://127.0.0.1:40976\n",
" <li><b>Dashboard: </b><a href='http://127.0.0.1:8787/status' target='_blank'>http://127.0.0.1:8787/status</a>\n",
"</ul>\n",
"</td>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3>Cluster</h3>\n",
"<ul>\n",
" <li><b>Workers: </b>8</li>\n",
" <li><b>Cores: </b>8</li>\n",
" <li><b>Memory: </b>540.96 GB</li>\n",
"</ul>\n",
"</td>\n",
"</tr>\n",
"</table>"
],
"text/plain": [
"<Client: scheduler='tcp://127.0.0.1:40976' processes=8 cores=8>"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from dask.distributed import Client, wait\n",
"from dask_cuda import LocalCUDACluster\n",
"\n",
"cluster = LocalCUDACluster()\n",
"client = Client(cluster)\n",
"client"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create Random Dataset"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div><strong>Dask DataFrame Structure:</strong></div>\n",
"<div>\n",
"<style>\n",
" .dataframe thead tr:only-child th {\n",
" text-align: right;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: left;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>x</th>\n",
" <th>id</th>\n",
" </tr>\n",
" <tr>\n",
" <th>npartitions=64</th>\n",
" <th></th>\n",
" <th></th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>float64</td>\n",
" <td>int64</td>\n",
" </tr>\n",
" <tr>\n",
" <th>15625000</th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>...</th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>984375000</th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>999999999</th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>\n",
"<div>Dask Name: concat-indexed, 64 tasks</div>"
],
"text/plain": [
"Dask DataFrame Structure:\n",
" x id\n",
"npartitions=64 \n",
"0 float64 int64\n",
"15625000 ... ...\n",
"... ... ...\n",
"984375000 ... ...\n",
"999999999 ... ...\n",
"Dask Name: concat-indexed, 64 tasks"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import dask.array as da\n",
"import dask.dataframe as dd\n",
"\n",
"n_rows = 1000000000\n",
"n_keys = 5000000\n",
"\n",
"left = dd.concat([\n",
" da.random.random(n_rows).to_dask_dataframe(columns='x'),\n",
" da.random.randint(0, n_keys, size=n_rows).to_dask_dataframe(columns='id'),\n",
"], axis=1).persist()\n",
"left"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div><strong>Dask DataFrame Structure:</strong></div>\n",
"<div>\n",
"<style>\n",
" .dataframe thead tr:only-child th {\n",
" text-align: right;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: left;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>y</th>\n",
" <th>id</th>\n",
" </tr>\n",
" <tr>\n",
" <th>npartitions=1</th>\n",
" <th></th>\n",
" <th></th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>float64</td>\n",
" <td>int64</td>\n",
" </tr>\n",
" <tr>\n",
" <th>9999999</th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>\n",
"<div>Dask Name: concat-indexed, 1 tasks</div>"
],
"text/plain": [
"Dask DataFrame Structure:\n",
" y id\n",
"npartitions=1 \n",
"0 float64 int64\n",
"9999999 ... ...\n",
"Dask Name: concat-indexed, 1 tasks"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"n_rows = 10000000\n",
"\n",
"right = dd.concat([\n",
" da.random.random(n_rows).to_dask_dataframe(columns='y'),\n",
" da.random.randint(0, n_keys, size=n_rows).to_dask_dataframe(columns='id'),\n",
"], axis=1).persist()\n",
"right"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Convert data to GPU and persist in device memory"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"import dask\n",
"import cudf\n",
"\n",
"gleft = left.map_partitions(cudf.from_pandas)\n",
"gright = right.map_partitions(cudf.from_pandas)\n",
"\n",
"gleft, gright = dask.persist(gleft, gright) # persist data in device memory"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div><strong>Dask DataFrame Structure:</strong></div>\n",
"<div>\n",
"<style>\n",
" .dataframe thead tr:only-child th {\n",
" text-align: right;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: left;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>x</th>\n",
" <th>id</th>\n",
" </tr>\n",
" <tr>\n",
" <th>npartitions=64</th>\n",
" <th></th>\n",
" <th></th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>float64</td>\n",
" <td>int64</td>\n",
" </tr>\n",
" <tr>\n",
" <th>15625000</th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>...</th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>984375000</th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>999999999</th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>\n",
"<div>Dask Name: from_pandas, 64 tasks</div>"
],
"text/plain": [
"<dask_cudf.DataFrame | 64 tasks | 64 npartitions>"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"gleft"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 192 ms, sys: 28 ms, total: 220 ms\n",
"Wall time: 237 ms\n"
]
},
{
"data": {
"text/plain": [
"500004719.254711"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%time gleft.x.sum().compute()"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'16.00 GB'"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from distributed.utils import format_bytes\n",
"format_bytes(len(gleft) * 8 * 2) # TODO: cudf needs `.memory_usage()` method"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'160.00 MB'"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"format_bytes(len(gright) * 8 * 2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Join on the ID column"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div><strong>Dask DataFrame Structure:</strong></div>\n",
"<div>\n",
"<style>\n",
" .dataframe thead tr:only-child th {\n",
" text-align: right;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: left;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>id</th>\n",
" <th>x</th>\n",
" <th>y</th>\n",
" </tr>\n",
" <tr>\n",
" <th>npartitions=64</th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th></th>\n",
" <td>int64</td>\n",
" <td>float64</td>\n",
" <td>float64</td>\n",
" </tr>\n",
" <tr>\n",
" <th></th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>...</th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th></th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th></th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>\n",
"<div>Dask Name: join_result, 4546 tasks</div>"
],
"text/plain": [
"<dask_cudf.DataFrame | 4546 tasks | 64 npartitions>"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"out = gleft.merge(gright, on=['id']) # this is lazy\n",
"out"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 12.9 s, sys: 784 ms, total: 13.7 s\n",
"Wall time: 45.8 s\n"
]
}
],
"source": [
"out = out.persist()\n",
"%time _ = wait(out)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Inspect output"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style>\n",
" .dataframe thead tr:only-child th {\n",
" text-align: right;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: left;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>id</th>\n",
" <th>x</th>\n",
" <th>y</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>25178</td>\n",
" <td>0.226179</td>\n",
" <td>0.706934</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>3298474</td>\n",
" <td>0.142727</td>\n",
" <td>0.753467</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>1731187</td>\n",
" <td>0.114984</td>\n",
" <td>0.839126</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>1211270</td>\n",
" <td>0.678222</td>\n",
" <td>0.128209</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>2530052</td>\n",
" <td>0.876008</td>\n",
" <td>0.882135</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" id x y\n",
"0 25178 0.226179 0.706934\n",
"1 3298474 0.142727 0.753467\n",
"2 1731187 0.114984 0.839126\n",
"3 1211270 0.678222 0.128209\n",
"4 2530052 0.876008 0.882135"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"out.head().to_pandas()"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'51.25 GB'"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from distributed.utils import format_bytes\n",
"format_bytes(len(out) * 8 * 3) # TODO: cudf needs `.memory_usage()` method"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python [conda env:cudf_dev]",
"language": "python",
"name": "conda-env-cudf_dev-py"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment