Created
January 24, 2019 22:41
-
-
Save mrocklin/6e2c33c33b32bc324e3965212f202f66 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Dask Dataframe with cuDF joins\n", | |
"\n", | |
"This shows using Dask DataFrame with cuDF on an eight-GPU machine.\n", | |
"\n", | |
"This makes three points:\n", | |
"\n", | |
"1. Joins work\n", | |
"2. They're slow due to communication\n", | |
"3. Agnostic Pandas/cuDF workflows provide usability gains" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Use a DGX" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/html": [ | |
"<table style=\"border: 2px solid white;\">\n", | |
"<tr>\n", | |
"<td style=\"vertical-align: top; border: 0px solid white\">\n", | |
"<h3>Client</h3>\n", | |
"<ul>\n", | |
" <li><b>Scheduler: </b>tcp://127.0.0.1:40976\n", | |
" <li><b>Dashboard: </b><a href='http://127.0.0.1:8787/status' target='_blank'>http://127.0.0.1:8787/status</a>\n", | |
"</ul>\n", | |
"</td>\n", | |
"<td style=\"vertical-align: top; border: 0px solid white\">\n", | |
"<h3>Cluster</h3>\n", | |
"<ul>\n", | |
" <li><b>Workers: </b>8</li>\n", | |
" <li><b>Cores: </b>8</li>\n", | |
" <li><b>Memory: </b>540.96 GB</li>\n", | |
"</ul>\n", | |
"</td>\n", | |
"</tr>\n", | |
"</table>" | |
], | |
"text/plain": [ | |
"<Client: scheduler='tcp://127.0.0.1:40976' processes=8 cores=8>" | |
] | |
}, | |
"execution_count": 1, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"from dask.distributed import Client, wait\n", | |
"from dask_cuda import LocalCUDACluster\n", | |
"\n", | |
"cluster = LocalCUDACluster()\n", | |
"client = Client(cluster)\n", | |
"client" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Create Random Dataset" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/html": [ | |
"<div><strong>Dask DataFrame Structure:</strong></div>\n", | |
"<div>\n", | |
"<style>\n", | |
" .dataframe thead tr:only-child th {\n", | |
" text-align: right;\n", | |
" }\n", | |
"\n", | |
" .dataframe thead th {\n", | |
" text-align: left;\n", | |
" }\n", | |
"\n", | |
" .dataframe tbody tr th {\n", | |
" vertical-align: top;\n", | |
" }\n", | |
"</style>\n", | |
"<table border=\"1\" class=\"dataframe\">\n", | |
" <thead>\n", | |
" <tr style=\"text-align: right;\">\n", | |
" <th></th>\n", | |
" <th>x</th>\n", | |
" <th>id</th>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>npartitions=64</th>\n", | |
" <th></th>\n", | |
" <th></th>\n", | |
" </tr>\n", | |
" </thead>\n", | |
" <tbody>\n", | |
" <tr>\n", | |
" <th>0</th>\n", | |
" <td>float64</td>\n", | |
" <td>int64</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>15625000</th>\n", | |
" <td>...</td>\n", | |
" <td>...</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>...</th>\n", | |
" <td>...</td>\n", | |
" <td>...</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>984375000</th>\n", | |
" <td>...</td>\n", | |
" <td>...</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>999999999</th>\n", | |
" <td>...</td>\n", | |
" <td>...</td>\n", | |
" </tr>\n", | |
" </tbody>\n", | |
"</table>\n", | |
"</div>\n", | |
"<div>Dask Name: concat-indexed, 64 tasks</div>" | |
], | |
"text/plain": [ | |
"Dask DataFrame Structure:\n", | |
" x id\n", | |
"npartitions=64 \n", | |
"0 float64 int64\n", | |
"15625000 ... ...\n", | |
"... ... ...\n", | |
"984375000 ... ...\n", | |
"999999999 ... ...\n", | |
"Dask Name: concat-indexed, 64 tasks" | |
] | |
}, | |
"execution_count": 2, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"import dask.array as da\n", | |
"import dask.dataframe as dd\n", | |
"\n", | |
"n_rows = 1000000000\n", | |
"n_keys = 5000000\n", | |
"\n", | |
"left = dd.concat([\n", | |
" da.random.random(n_rows).to_dask_dataframe(columns='x'),\n", | |
" da.random.randint(0, n_keys, size=n_rows).to_dask_dataframe(columns='id'),\n", | |
"], axis=1).persist()\n", | |
"left" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/html": [ | |
"<div><strong>Dask DataFrame Structure:</strong></div>\n", | |
"<div>\n", | |
"<style>\n", | |
" .dataframe thead tr:only-child th {\n", | |
" text-align: right;\n", | |
" }\n", | |
"\n", | |
" .dataframe thead th {\n", | |
" text-align: left;\n", | |
" }\n", | |
"\n", | |
" .dataframe tbody tr th {\n", | |
" vertical-align: top;\n", | |
" }\n", | |
"</style>\n", | |
"<table border=\"1\" class=\"dataframe\">\n", | |
" <thead>\n", | |
" <tr style=\"text-align: right;\">\n", | |
" <th></th>\n", | |
" <th>y</th>\n", | |
" <th>id</th>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>npartitions=1</th>\n", | |
" <th></th>\n", | |
" <th></th>\n", | |
" </tr>\n", | |
" </thead>\n", | |
" <tbody>\n", | |
" <tr>\n", | |
" <th>0</th>\n", | |
" <td>float64</td>\n", | |
" <td>int64</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>9999999</th>\n", | |
" <td>...</td>\n", | |
" <td>...</td>\n", | |
" </tr>\n", | |
" </tbody>\n", | |
"</table>\n", | |
"</div>\n", | |
"<div>Dask Name: concat-indexed, 1 tasks</div>" | |
], | |
"text/plain": [ | |
"Dask DataFrame Structure:\n", | |
" y id\n", | |
"npartitions=1 \n", | |
"0 float64 int64\n", | |
"9999999 ... ...\n", | |
"Dask Name: concat-indexed, 1 tasks" | |
] | |
}, | |
"execution_count": 3, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"n_rows = 10000000\n", | |
"\n", | |
"right = dd.concat([\n", | |
" da.random.random(n_rows).to_dask_dataframe(columns='y'),\n", | |
" da.random.randint(0, n_keys, size=n_rows).to_dask_dataframe(columns='id'),\n", | |
"], axis=1).persist()\n", | |
"right" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Convert data to GPU and persist in device memory" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import dask\n", | |
"import cudf\n", | |
"\n", | |
"gleft = left.map_partitions(cudf.from_pandas)\n", | |
"gright = right.map_partitions(cudf.from_pandas)\n", | |
"\n", | |
"gleft, gright = dask.persist(gleft, gright) # persist data in device memory" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/html": [ | |
"<div><strong>Dask DataFrame Structure:</strong></div>\n", | |
"<div>\n", | |
"<style>\n", | |
" .dataframe thead tr:only-child th {\n", | |
" text-align: right;\n", | |
" }\n", | |
"\n", | |
" .dataframe thead th {\n", | |
" text-align: left;\n", | |
" }\n", | |
"\n", | |
" .dataframe tbody tr th {\n", | |
" vertical-align: top;\n", | |
" }\n", | |
"</style>\n", | |
"<table border=\"1\" class=\"dataframe\">\n", | |
" <thead>\n", | |
" <tr style=\"text-align: right;\">\n", | |
" <th></th>\n", | |
" <th>x</th>\n", | |
" <th>id</th>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>npartitions=64</th>\n", | |
" <th></th>\n", | |
" <th></th>\n", | |
" </tr>\n", | |
" </thead>\n", | |
" <tbody>\n", | |
" <tr>\n", | |
" <th>0</th>\n", | |
" <td>float64</td>\n", | |
" <td>int64</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>15625000</th>\n", | |
" <td>...</td>\n", | |
" <td>...</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>...</th>\n", | |
" <td>...</td>\n", | |
" <td>...</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>984375000</th>\n", | |
" <td>...</td>\n", | |
" <td>...</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>999999999</th>\n", | |
" <td>...</td>\n", | |
" <td>...</td>\n", | |
" </tr>\n", | |
" </tbody>\n", | |
"</table>\n", | |
"</div>\n", | |
"<div>Dask Name: from_pandas, 64 tasks</div>" | |
], | |
"text/plain": [ | |
"<dask_cudf.DataFrame | 64 tasks | 64 npartitions>" | |
] | |
}, | |
"execution_count": 5, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"gleft" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"CPU times: user 192 ms, sys: 28 ms, total: 220 ms\n", | |
"Wall time: 237 ms\n" | |
] | |
}, | |
{ | |
"data": { | |
"text/plain": [ | |
"500004719.254711" | |
] | |
}, | |
"execution_count": 6, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"%time gleft.x.sum().compute()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 7, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"'16.00 GB'" | |
] | |
}, | |
"execution_count": 7, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"from distributed.utils import format_bytes\n", | |
"format_bytes(len(gleft) * 8 * 2) # TODO: cudf needs `.memory_usage()` method" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 8, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"'160.00 MB'" | |
] | |
}, | |
"execution_count": 8, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"format_bytes(len(gright) * 8 * 2)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Join on the ID column" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 9, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/html": [ | |
"<div><strong>Dask DataFrame Structure:</strong></div>\n", | |
"<div>\n", | |
"<style>\n", | |
" .dataframe thead tr:only-child th {\n", | |
" text-align: right;\n", | |
" }\n", | |
"\n", | |
" .dataframe thead th {\n", | |
" text-align: left;\n", | |
" }\n", | |
"\n", | |
" .dataframe tbody tr th {\n", | |
" vertical-align: top;\n", | |
" }\n", | |
"</style>\n", | |
"<table border=\"1\" class=\"dataframe\">\n", | |
" <thead>\n", | |
" <tr style=\"text-align: right;\">\n", | |
" <th></th>\n", | |
" <th>id</th>\n", | |
" <th>x</th>\n", | |
" <th>y</th>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>npartitions=64</th>\n", | |
" <th></th>\n", | |
" <th></th>\n", | |
" <th></th>\n", | |
" </tr>\n", | |
" </thead>\n", | |
" <tbody>\n", | |
" <tr>\n", | |
" <th></th>\n", | |
" <td>int64</td>\n", | |
" <td>float64</td>\n", | |
" <td>float64</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th></th>\n", | |
" <td>...</td>\n", | |
" <td>...</td>\n", | |
" <td>...</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>...</th>\n", | |
" <td>...</td>\n", | |
" <td>...</td>\n", | |
" <td>...</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th></th>\n", | |
" <td>...</td>\n", | |
" <td>...</td>\n", | |
" <td>...</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th></th>\n", | |
" <td>...</td>\n", | |
" <td>...</td>\n", | |
" <td>...</td>\n", | |
" </tr>\n", | |
" </tbody>\n", | |
"</table>\n", | |
"</div>\n", | |
"<div>Dask Name: join_result, 4546 tasks</div>" | |
], | |
"text/plain": [ | |
"<dask_cudf.DataFrame | 4546 tasks | 64 npartitions>" | |
] | |
}, | |
"execution_count": 9, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"out = gleft.merge(gright, on=['id']) # this is lazy\n", | |
"out" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 10, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"CPU times: user 12.9 s, sys: 784 ms, total: 13.7 s\n", | |
"Wall time: 45.8 s\n" | |
] | |
} | |
], | |
"source": [ | |
"out = out.persist()\n", | |
"%time _ = wait(out)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Inspect output" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 11, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/html": [ | |
"<div>\n", | |
"<style>\n", | |
" .dataframe thead tr:only-child th {\n", | |
" text-align: right;\n", | |
" }\n", | |
"\n", | |
" .dataframe thead th {\n", | |
" text-align: left;\n", | |
" }\n", | |
"\n", | |
" .dataframe tbody tr th {\n", | |
" vertical-align: top;\n", | |
" }\n", | |
"</style>\n", | |
"<table border=\"1\" class=\"dataframe\">\n", | |
" <thead>\n", | |
" <tr style=\"text-align: right;\">\n", | |
" <th></th>\n", | |
" <th>id</th>\n", | |
" <th>x</th>\n", | |
" <th>y</th>\n", | |
" </tr>\n", | |
" </thead>\n", | |
" <tbody>\n", | |
" <tr>\n", | |
" <th>0</th>\n", | |
" <td>25178</td>\n", | |
" <td>0.226179</td>\n", | |
" <td>0.706934</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>1</th>\n", | |
" <td>3298474</td>\n", | |
" <td>0.142727</td>\n", | |
" <td>0.753467</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>2</th>\n", | |
" <td>1731187</td>\n", | |
" <td>0.114984</td>\n", | |
" <td>0.839126</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>3</th>\n", | |
" <td>1211270</td>\n", | |
" <td>0.678222</td>\n", | |
" <td>0.128209</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>4</th>\n", | |
" <td>2530052</td>\n", | |
" <td>0.876008</td>\n", | |
" <td>0.882135</td>\n", | |
" </tr>\n", | |
" </tbody>\n", | |
"</table>\n", | |
"</div>" | |
], | |
"text/plain": [ | |
" id x y\n", | |
"0 25178 0.226179 0.706934\n", | |
"1 3298474 0.142727 0.753467\n", | |
"2 1731187 0.114984 0.839126\n", | |
"3 1211270 0.678222 0.128209\n", | |
"4 2530052 0.876008 0.882135" | |
] | |
}, | |
"execution_count": 11, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"out.head().to_pandas()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 12, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"'51.25 GB'" | |
] | |
}, | |
"execution_count": 12, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"from distributed.utils import format_bytes\n", | |
"format_bytes(len(out) * 8 * 3) # TODO: cudf needs `.memory_usage()` method" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python [conda env:cudf_dev]", | |
"language": "python", | |
"name": "conda-env-cudf_dev-py" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.5.5" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 2 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment