Skip to content

Instantly share code, notes, and snippets.

@mrocklin
Last active July 6, 2023 13:55
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save mrocklin/4b1b80d1ae07ec73f75b2a19c8e90e2e to your computer and use it in GitHub Desktop.
Save mrocklin/4b1b80d1ae07ec73f75b2a19c8e90e2e to your computer and use it in GitHub Desktop.
Dask Dataframe with cuDF on a simple NYC Taxi CSV computation
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Dask DataFrame and cuDF on NYC Taxi CSV data"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Start Dask Cluster on an Eight-GPU DGX Machine"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<table style=\"border: 2px solid white;\">\n",
"<tr>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3>Client</h3>\n",
"<ul>\n",
" <li><b>Scheduler: </b>tcp://127.0.0.1:33058\n",
" <li><b>Dashboard: </b><a href='http://127.0.0.1:8787/status' target='_blank'>http://127.0.0.1:8787/status</a>\n",
"</ul>\n",
"</td>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3>Cluster</h3>\n",
"<ul>\n",
" <li><b>Workers: </b>8</li>\n",
" <li><b>Cores: </b>8</li>\n",
" <li><b>Memory: </b>540.96 GB</li>\n",
"</ul>\n",
"</td>\n",
"</tr>\n",
"</table>"
],
"text/plain": [
"<Client: scheduler='tcp://127.0.0.1:33058' processes=8 cores=8>"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from dask_cuda import LocalCUDACluster\n",
"cluster = LocalCUDACluster()\n",
"\n",
"from dask.distributed import Client\n",
"client = Client(cluster)\n",
"client"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Previously we ran this to shard the files more finely for cudf.read_csv\n",
"\n",
"```python\n",
"import dask.dataframe as dd\n",
"pdf = dd.read_csv('data/nyc/yellow_tripdata_2017-*.csv',\n",
" parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])\n",
"\n",
"pdf.repartition(npartitions=100).to_csv('data/nyc/many/*.csv', index=False)\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Read CSV files into Dask-GPU-DataFrame"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style>\n",
" .dataframe thead tr:only-child th {\n",
" text-align: right;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: left;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>VendorID</th>\n",
" <th>tpep_pickup_datetime</th>\n",
" <th>tpep_dropoff_datetime</th>\n",
" <th>passenger_count</th>\n",
" <th>trip_distance</th>\n",
" <th>RatecodeID</th>\n",
" <th>store_and_fwd_flag</th>\n",
" <th>PULocationID</th>\n",
" <th>DOLocationID</th>\n",
" <th>payment_type</th>\n",
" <th>fare_amount</th>\n",
" <th>extra</th>\n",
" <th>mta_tax</th>\n",
" <th>tip_amount</th>\n",
" <th>tolls_amount</th>\n",
" <th>improvement_surcharge</th>\n",
" <th>total_amount</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>1</td>\n",
" <td>1969-12-31 23:59:59</td>\n",
" <td>1969-12-31 23:59:59</td>\n",
" <td>1</td>\n",
" <td>3.30</td>\n",
" <td>1</td>\n",
" <td>-2049400382</td>\n",
" <td>263</td>\n",
" <td>161</td>\n",
" <td>1</td>\n",
" <td>12.5</td>\n",
" <td>0.0</td>\n",
" <td>0.5</td>\n",
" <td>2.00</td>\n",
" <td>0.0</td>\n",
" <td>0.3</td>\n",
" <td>15.30</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>1</td>\n",
" <td>1969-12-31 23:59:59</td>\n",
" <td>1969-12-31 23:59:59</td>\n",
" <td>1</td>\n",
" <td>0.90</td>\n",
" <td>1</td>\n",
" <td>-2049400382</td>\n",
" <td>186</td>\n",
" <td>234</td>\n",
" <td>1</td>\n",
" <td>5.0</td>\n",
" <td>0.0</td>\n",
" <td>0.5</td>\n",
" <td>1.45</td>\n",
" <td>0.0</td>\n",
" <td>0.3</td>\n",
" <td>7.25</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>1</td>\n",
" <td>1969-12-31 23:59:59</td>\n",
" <td>1969-12-31 23:59:59</td>\n",
" <td>1</td>\n",
" <td>1.10</td>\n",
" <td>1</td>\n",
" <td>-2049400382</td>\n",
" <td>164</td>\n",
" <td>161</td>\n",
" <td>1</td>\n",
" <td>5.5</td>\n",
" <td>0.0</td>\n",
" <td>0.5</td>\n",
" <td>1.00</td>\n",
" <td>0.0</td>\n",
" <td>0.3</td>\n",
" <td>7.30</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>1</td>\n",
" <td>1969-12-31 23:59:59</td>\n",
" <td>1969-12-31 23:59:59</td>\n",
" <td>1</td>\n",
" <td>1.10</td>\n",
" <td>1</td>\n",
" <td>-2049400382</td>\n",
" <td>236</td>\n",
" <td>75</td>\n",
" <td>1</td>\n",
" <td>6.0</td>\n",
" <td>0.0</td>\n",
" <td>0.5</td>\n",
" <td>1.70</td>\n",
" <td>0.0</td>\n",
" <td>0.3</td>\n",
" <td>8.50</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>2</td>\n",
" <td>1969-12-31 23:59:59</td>\n",
" <td>1969-12-31 23:59:59</td>\n",
" <td>1</td>\n",
" <td>0.02</td>\n",
" <td>2</td>\n",
" <td>-2049400382</td>\n",
" <td>249</td>\n",
" <td>234</td>\n",
" <td>2</td>\n",
" <td>52.0</td>\n",
" <td>0.0</td>\n",
" <td>0.5</td>\n",
" <td>0.00</td>\n",
" <td>0.0</td>\n",
" <td>0.3</td>\n",
" <td>52.80</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count \\\n",
"0 1 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n",
"1 1 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n",
"2 1 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n",
"3 1 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n",
"4 2 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n",
"\n",
" trip_distance RatecodeID store_and_fwd_flag PULocationID DOLocationID \\\n",
"0 3.30 1 -2049400382 263 161 \n",
"1 0.90 1 -2049400382 186 234 \n",
"2 1.10 1 -2049400382 164 161 \n",
"3 1.10 1 -2049400382 236 75 \n",
"4 0.02 2 -2049400382 249 234 \n",
"\n",
" payment_type fare_amount extra mta_tax tip_amount tolls_amount \\\n",
"0 1 12.5 0.0 0.5 2.00 0.0 \n",
"1 1 5.0 0.0 0.5 1.45 0.0 \n",
"2 1 5.5 0.0 0.5 1.00 0.0 \n",
"3 1 6.0 0.0 0.5 1.70 0.0 \n",
"4 2 52.0 0.0 0.5 0.00 0.0 \n",
"\n",
" improvement_surcharge total_amount \n",
"0 0.3 15.30 \n",
"1 0.3 7.25 \n",
"2 0.3 7.30 \n",
"3 0.3 8.50 \n",
"4 0.3 52.80 "
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import dask_cudf\n",
"\n",
"gdf = dask_cudf.read_csv('data/nyc/many/*.csv')\n",
"gdf.head().to_pandas()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Time a full-pass computation\n",
"\n",
"Most of the time here is spent reading data from disk and parsing it."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 1.16 s, sys: 100 ms, total: 1.26 s\n",
"Wall time: 4.68 s\n"
]
},
{
"data": {
"text/plain": [
"184464740"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%time gdf.passenger_count.sum().compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Single GPU"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 7.5 s, sys: 4.66 s, total: 12.2 s\n",
"Wall time: 10.9 s\n"
]
},
{
"data": {
"text/plain": [
"184464740"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%time gdf.passenger_count.sum().compute(scheduler='single-threaded')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Single CPU"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"pandas.core.frame.DataFrame"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import dask.dataframe as dd\n",
"\n",
"df = dd.read_csv('data/nyc/many/*.csv')\n",
"type(df.head())"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 34min 9s, sys: 21.5 s, total: 34min 30s\n",
"Wall time: 3min 14s\n"
]
},
{
"data": {
"text/plain": [
"184464740"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%time df.passenger_count.sum().compute(scheduler='single-threaded')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Eight CPUs, one per process"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 10.8 s, sys: 988 ms, total: 11.8 s\n",
"Wall time: 57.5 s\n"
]
},
{
"data": {
"text/plain": [
"184464740"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%time df.passenger_count.sum().compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Eighty CPUs with a balance of threads and processes"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:49958 remote=tcp://127.0.0.1:33058>\n"
]
}
],
"source": [
"client.close()\n",
"cluster.close()"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<table style=\"border: 2px solid white;\">\n",
"<tr>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3>Client</h3>\n",
"<ul>\n",
" <li><b>Scheduler: </b>tcp://127.0.0.1:45873\n",
" <li><b>Dashboard: </b><a href='http://127.0.0.1:8787/status' target='_blank'>http://127.0.0.1:8787/status</a>\n",
"</ul>\n",
"</td>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3>Cluster</h3>\n",
"<ul>\n",
" <li><b>Workers: </b>10</li>\n",
" <li><b>Cores: </b>80</li>\n",
" <li><b>Memory: </b>540.96 GB</li>\n",
"</ul>\n",
"</td>\n",
"</tr>\n",
"</table>"
],
"text/plain": [
"<Client: scheduler='tcp://127.0.0.1:45873' processes=10 cores=80>"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"client = Client(n_workers=10, threads_per_worker=8)\n",
"client"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 7.55 s, sys: 692 ms, total: 8.24 s\n",
"Wall time: 34.9 s\n"
]
},
{
"data": {
"text/plain": [
"184464740"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%time df.passenger_count.sum().compute()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python [conda env:cudf_dev]",
"language": "python",
"name": "conda-env-cudf_dev-py"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}