Last active
July 6, 2023 13:55
-
-
Save mrocklin/4b1b80d1ae07ec73f75b2a19c8e90e2e to your computer and use it in GitHub Desktop.
Dask Dataframe with cuDF on a simple NYC Taxi CSV computation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Dask DataFrame and cuDF on NYC Taxi CSV data" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Start Dask Cluster on an Eight-GPU DGX Machine" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/html": [ | |
"<table style=\"border: 2px solid white;\">\n", | |
"<tr>\n", | |
"<td style=\"vertical-align: top; border: 0px solid white\">\n", | |
"<h3>Client</h3>\n", | |
"<ul>\n", | |
" <li><b>Scheduler: </b>tcp://127.0.0.1:33058\n", | |
" <li><b>Dashboard: </b><a href='http://127.0.0.1:8787/status' target='_blank'>http://127.0.0.1:8787/status</a>\n", | |
"</ul>\n", | |
"</td>\n", | |
"<td style=\"vertical-align: top; border: 0px solid white\">\n", | |
"<h3>Cluster</h3>\n", | |
"<ul>\n", | |
" <li><b>Workers: </b>8</li>\n", | |
" <li><b>Cores: </b>8</li>\n", | |
" <li><b>Memory: </b>540.96 GB</li>\n", | |
"</ul>\n", | |
"</td>\n", | |
"</tr>\n", | |
"</table>" | |
], | |
"text/plain": [ | |
"<Client: scheduler='tcp://127.0.0.1:33058' processes=8 cores=8>" | |
] | |
}, | |
"execution_count": 1, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"from dask_cuda import LocalCUDACluster\n", | |
"cluster = LocalCUDACluster()\n", | |
"\n", | |
"from dask.distributed import Client\n", | |
"client = Client(cluster)\n", | |
"client" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Previously we ran this to shard the files more finely for cudf.read_csv\n", | |
"\n", | |
"```python\n", | |
"import dask.dataframe as dd\n", | |
"pdf = dd.read_csv('data/nyc/yellow_tripdata_2017-*.csv',\n", | |
" parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])\n", | |
"\n", | |
"pdf.repartition(npartitions=100).to_csv('data/nyc/many/*.csv', index=False)\n", | |
"```" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Read CSV files into Dask-GPU-DataFrame" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/html": [ | |
"<div>\n", | |
"<style>\n", | |
" .dataframe thead tr:only-child th {\n", | |
" text-align: right;\n", | |
" }\n", | |
"\n", | |
" .dataframe thead th {\n", | |
" text-align: left;\n", | |
" }\n", | |
"\n", | |
" .dataframe tbody tr th {\n", | |
" vertical-align: top;\n", | |
" }\n", | |
"</style>\n", | |
"<table border=\"1\" class=\"dataframe\">\n", | |
" <thead>\n", | |
" <tr style=\"text-align: right;\">\n", | |
" <th></th>\n", | |
" <th>VendorID</th>\n", | |
" <th>tpep_pickup_datetime</th>\n", | |
" <th>tpep_dropoff_datetime</th>\n", | |
" <th>passenger_count</th>\n", | |
" <th>trip_distance</th>\n", | |
" <th>RatecodeID</th>\n", | |
" <th>store_and_fwd_flag</th>\n", | |
" <th>PULocationID</th>\n", | |
" <th>DOLocationID</th>\n", | |
" <th>payment_type</th>\n", | |
" <th>fare_amount</th>\n", | |
" <th>extra</th>\n", | |
" <th>mta_tax</th>\n", | |
" <th>tip_amount</th>\n", | |
" <th>tolls_amount</th>\n", | |
" <th>improvement_surcharge</th>\n", | |
" <th>total_amount</th>\n", | |
" </tr>\n", | |
" </thead>\n", | |
" <tbody>\n", | |
" <tr>\n", | |
" <th>0</th>\n", | |
" <td>1</td>\n", | |
" <td>1969-12-31 23:59:59</td>\n", | |
" <td>1969-12-31 23:59:59</td>\n", | |
" <td>1</td>\n", | |
" <td>3.30</td>\n", | |
" <td>1</td>\n", | |
" <td>-2049400382</td>\n", | |
" <td>263</td>\n", | |
" <td>161</td>\n", | |
" <td>1</td>\n", | |
" <td>12.5</td>\n", | |
" <td>0.0</td>\n", | |
" <td>0.5</td>\n", | |
" <td>2.00</td>\n", | |
" <td>0.0</td>\n", | |
" <td>0.3</td>\n", | |
" <td>15.30</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>1</th>\n", | |
" <td>1</td>\n", | |
" <td>1969-12-31 23:59:59</td>\n", | |
" <td>1969-12-31 23:59:59</td>\n", | |
" <td>1</td>\n", | |
" <td>0.90</td>\n", | |
" <td>1</td>\n", | |
" <td>-2049400382</td>\n", | |
" <td>186</td>\n", | |
" <td>234</td>\n", | |
" <td>1</td>\n", | |
" <td>5.0</td>\n", | |
" <td>0.0</td>\n", | |
" <td>0.5</td>\n", | |
" <td>1.45</td>\n", | |
" <td>0.0</td>\n", | |
" <td>0.3</td>\n", | |
" <td>7.25</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>2</th>\n", | |
" <td>1</td>\n", | |
" <td>1969-12-31 23:59:59</td>\n", | |
" <td>1969-12-31 23:59:59</td>\n", | |
" <td>1</td>\n", | |
" <td>1.10</td>\n", | |
" <td>1</td>\n", | |
" <td>-2049400382</td>\n", | |
" <td>164</td>\n", | |
" <td>161</td>\n", | |
" <td>1</td>\n", | |
" <td>5.5</td>\n", | |
" <td>0.0</td>\n", | |
" <td>0.5</td>\n", | |
" <td>1.00</td>\n", | |
" <td>0.0</td>\n", | |
" <td>0.3</td>\n", | |
" <td>7.30</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>3</th>\n", | |
" <td>1</td>\n", | |
" <td>1969-12-31 23:59:59</td>\n", | |
" <td>1969-12-31 23:59:59</td>\n", | |
" <td>1</td>\n", | |
" <td>1.10</td>\n", | |
" <td>1</td>\n", | |
" <td>-2049400382</td>\n", | |
" <td>236</td>\n", | |
" <td>75</td>\n", | |
" <td>1</td>\n", | |
" <td>6.0</td>\n", | |
" <td>0.0</td>\n", | |
" <td>0.5</td>\n", | |
" <td>1.70</td>\n", | |
" <td>0.0</td>\n", | |
" <td>0.3</td>\n", | |
" <td>8.50</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>4</th>\n", | |
" <td>2</td>\n", | |
" <td>1969-12-31 23:59:59</td>\n", | |
" <td>1969-12-31 23:59:59</td>\n", | |
" <td>1</td>\n", | |
" <td>0.02</td>\n", | |
" <td>2</td>\n", | |
" <td>-2049400382</td>\n", | |
" <td>249</td>\n", | |
" <td>234</td>\n", | |
" <td>2</td>\n", | |
" <td>52.0</td>\n", | |
" <td>0.0</td>\n", | |
" <td>0.5</td>\n", | |
" <td>0.00</td>\n", | |
" <td>0.0</td>\n", | |
" <td>0.3</td>\n", | |
" <td>52.80</td>\n", | |
" </tr>\n", | |
" </tbody>\n", | |
"</table>\n", | |
"</div>" | |
], | |
"text/plain": [ | |
" VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count \\\n", | |
"0 1 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n", | |
"1 1 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n", | |
"2 1 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n", | |
"3 1 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n", | |
"4 2 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n", | |
"\n", | |
" trip_distance RatecodeID store_and_fwd_flag PULocationID DOLocationID \\\n", | |
"0 3.30 1 -2049400382 263 161 \n", | |
"1 0.90 1 -2049400382 186 234 \n", | |
"2 1.10 1 -2049400382 164 161 \n", | |
"3 1.10 1 -2049400382 236 75 \n", | |
"4 0.02 2 -2049400382 249 234 \n", | |
"\n", | |
" payment_type fare_amount extra mta_tax tip_amount tolls_amount \\\n", | |
"0 1 12.5 0.0 0.5 2.00 0.0 \n", | |
"1 1 5.0 0.0 0.5 1.45 0.0 \n", | |
"2 1 5.5 0.0 0.5 1.00 0.0 \n", | |
"3 1 6.0 0.0 0.5 1.70 0.0 \n", | |
"4 2 52.0 0.0 0.5 0.00 0.0 \n", | |
"\n", | |
" improvement_surcharge total_amount \n", | |
"0 0.3 15.30 \n", | |
"1 0.3 7.25 \n", | |
"2 0.3 7.30 \n", | |
"3 0.3 8.50 \n", | |
"4 0.3 52.80 " | |
] | |
}, | |
"execution_count": 2, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"import dask_cudf\n", | |
"\n", | |
"gdf = dask_cudf.read_csv('data/nyc/many/*.csv')\n", | |
"gdf.head().to_pandas()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Time a full-pass computation\n", | |
"\n", | |
"Most of the time here is spent reading data from disk and parsing it." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"CPU times: user 1.16 s, sys: 100 ms, total: 1.26 s\n", | |
"Wall time: 4.68 s\n" | |
] | |
}, | |
{ | |
"data": { | |
"text/plain": [ | |
"184464740" | |
] | |
}, | |
"execution_count": 4, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"%time gdf.passenger_count.sum().compute()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Single GPU" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"CPU times: user 7.5 s, sys: 4.66 s, total: 12.2 s\n", | |
"Wall time: 10.9 s\n" | |
] | |
}, | |
{ | |
"data": { | |
"text/plain": [ | |
"184464740" | |
] | |
}, | |
"execution_count": 6, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"%time gdf.passenger_count.sum().compute(scheduler='single-threaded')" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Single CPU" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 7, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"pandas.core.frame.DataFrame" | |
] | |
}, | |
"execution_count": 7, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"import dask.dataframe as dd\n", | |
"\n", | |
"df = dd.read_csv('data/nyc/many/*.csv')\n", | |
"type(df.head())" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 8, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"CPU times: user 34min 9s, sys: 21.5 s, total: 34min 30s\n", | |
"Wall time: 3min 14s\n" | |
] | |
}, | |
{ | |
"data": { | |
"text/plain": [ | |
"184464740" | |
] | |
}, | |
"execution_count": 8, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"%time df.passenger_count.sum().compute(scheduler='single-threaded')" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Eight CPUs, one per process" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 9, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"CPU times: user 10.8 s, sys: 988 ms, total: 11.8 s\n", | |
"Wall time: 57.5 s\n" | |
] | |
}, | |
{ | |
"data": { | |
"text/plain": [ | |
"184464740" | |
] | |
}, | |
"execution_count": 9, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"%time df.passenger_count.sum().compute()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Eighty CPUs with a balance of threads and processes" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 10, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:49958 remote=tcp://127.0.0.1:33058>\n" | |
] | |
} | |
], | |
"source": [ | |
"client.close()\n", | |
"cluster.close()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 11, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/html": [ | |
"<table style=\"border: 2px solid white;\">\n", | |
"<tr>\n", | |
"<td style=\"vertical-align: top; border: 0px solid white\">\n", | |
"<h3>Client</h3>\n", | |
"<ul>\n", | |
" <li><b>Scheduler: </b>tcp://127.0.0.1:45873\n", | |
" <li><b>Dashboard: </b><a href='http://127.0.0.1:8787/status' target='_blank'>http://127.0.0.1:8787/status</a>\n", | |
"</ul>\n", | |
"</td>\n", | |
"<td style=\"vertical-align: top; border: 0px solid white\">\n", | |
"<h3>Cluster</h3>\n", | |
"<ul>\n", | |
" <li><b>Workers: </b>10</li>\n", | |
" <li><b>Cores: </b>80</li>\n", | |
" <li><b>Memory: </b>540.96 GB</li>\n", | |
"</ul>\n", | |
"</td>\n", | |
"</tr>\n", | |
"</table>" | |
], | |
"text/plain": [ | |
"<Client: scheduler='tcp://127.0.0.1:45873' processes=10 cores=80>" | |
] | |
}, | |
"execution_count": 11, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"client = Client(n_workers=10, threads_per_worker=8)\n", | |
"client" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 12, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"CPU times: user 7.55 s, sys: 692 ms, total: 8.24 s\n", | |
"Wall time: 34.9 s\n" | |
] | |
}, | |
{ | |
"data": { | |
"text/plain": [ | |
"184464740" | |
] | |
}, | |
"execution_count": 12, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"%time df.passenger_count.sum().compute()" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python [conda env:cudf_dev]", | |
"language": "python", | |
"name": "conda-env-cudf_dev-py" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.5.5" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 2 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
which exactly data set you used ?
from where to download?
https://www.kaggle.com/competitions/nyc-taxi-trip-duration/data
https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page
https://catalog.data.gov/dataset/2020-yellow-taxi-trip-data-january-june/resource/c3ec101d-e6c7-4084-85f3-3930defd8140