Skip to content

Instantly share code, notes, and snippets.

Last active July 6, 2023 13:55
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save mrocklin/4b1b80d1ae07ec73f75b2a19c8e90e2e to your computer and use it in GitHub Desktop.
Save mrocklin/4b1b80d1ae07ec73f75b2a19c8e90e2e to your computer and use it in GitHub Desktop.
Dask Dataframe with cuDF on a simple NYC Taxi CSV computation
Display the source blob
Display the rendered blob
"cells": [
"cell_type": "markdown",
"metadata": {},
"source": [
"# Dask DataFrame and cuDF on NYC Taxi CSV data"
"cell_type": "markdown",
"metadata": {},
"source": [
"### Start Dask Cluster on an Eight-GPU DGX Machine"
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
"data": {
"text/html": [
"<table style=\"border: 2px solid white;\">\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
" <li><b>Scheduler: </b>tcp://\n",
" <li><b>Dashboard: </b><a href='' target='_blank'></a>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
" <li><b>Workers: </b>8</li>\n",
" <li><b>Cores: </b>8</li>\n",
" <li><b>Memory: </b>540.96 GB</li>\n",
"text/plain": [
"<Client: scheduler='tcp://' processes=8 cores=8>"
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
"source": [
"from dask_cuda import LocalCUDACluster\n",
"cluster = LocalCUDACluster()\n",
"from dask.distributed import Client\n",
"client = Client(cluster)\n",
"cell_type": "markdown",
"metadata": {},
"source": [
"### Previously we ran this to shard the files more finely for cudf.read_csv\n",
"import dask.dataframe as dd\n",
"pdf = dd.read_csv('data/nyc/yellow_tripdata_2017-*.csv',\n",
" parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])\n",
"pdf.repartition(npartitions=100).to_csv('data/nyc/many/*.csv', index=False)\n",
"cell_type": "markdown",
"metadata": {},
"source": [
"### Read CSV files into Dask-GPU-DataFrame"
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
"data": {
"text/html": [
" .dataframe thead tr:only-child th {\n",
" text-align: right;\n",
" }\n",
" .dataframe thead th {\n",
" text-align: left;\n",
" }\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>VendorID</th>\n",
" <th>tpep_pickup_datetime</th>\n",
" <th>tpep_dropoff_datetime</th>\n",
" <th>passenger_count</th>\n",
" <th>trip_distance</th>\n",
" <th>RatecodeID</th>\n",
" <th>store_and_fwd_flag</th>\n",
" <th>PULocationID</th>\n",
" <th>DOLocationID</th>\n",
" <th>payment_type</th>\n",
" <th>fare_amount</th>\n",
" <th>extra</th>\n",
" <th>mta_tax</th>\n",
" <th>tip_amount</th>\n",
" <th>tolls_amount</th>\n",
" <th>improvement_surcharge</th>\n",
" <th>total_amount</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>1</td>\n",
" <td>1969-12-31 23:59:59</td>\n",
" <td>1969-12-31 23:59:59</td>\n",
" <td>1</td>\n",
" <td>3.30</td>\n",
" <td>1</td>\n",
" <td>-2049400382</td>\n",
" <td>263</td>\n",
" <td>161</td>\n",
" <td>1</td>\n",
" <td>12.5</td>\n",
" <td>0.0</td>\n",
" <td>0.5</td>\n",
" <td>2.00</td>\n",
" <td>0.0</td>\n",
" <td>0.3</td>\n",
" <td>15.30</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>1</td>\n",
" <td>1969-12-31 23:59:59</td>\n",
" <td>1969-12-31 23:59:59</td>\n",
" <td>1</td>\n",
" <td>0.90</td>\n",
" <td>1</td>\n",
" <td>-2049400382</td>\n",
" <td>186</td>\n",
" <td>234</td>\n",
" <td>1</td>\n",
" <td>5.0</td>\n",
" <td>0.0</td>\n",
" <td>0.5</td>\n",
" <td>1.45</td>\n",
" <td>0.0</td>\n",
" <td>0.3</td>\n",
" <td>7.25</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>1</td>\n",
" <td>1969-12-31 23:59:59</td>\n",
" <td>1969-12-31 23:59:59</td>\n",
" <td>1</td>\n",
" <td>1.10</td>\n",
" <td>1</td>\n",
" <td>-2049400382</td>\n",
" <td>164</td>\n",
" <td>161</td>\n",
" <td>1</td>\n",
" <td>5.5</td>\n",
" <td>0.0</td>\n",
" <td>0.5</td>\n",
" <td>1.00</td>\n",
" <td>0.0</td>\n",
" <td>0.3</td>\n",
" <td>7.30</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>1</td>\n",
" <td>1969-12-31 23:59:59</td>\n",
" <td>1969-12-31 23:59:59</td>\n",
" <td>1</td>\n",
" <td>1.10</td>\n",
" <td>1</td>\n",
" <td>-2049400382</td>\n",
" <td>236</td>\n",
" <td>75</td>\n",
" <td>1</td>\n",
" <td>6.0</td>\n",
" <td>0.0</td>\n",
" <td>0.5</td>\n",
" <td>1.70</td>\n",
" <td>0.0</td>\n",
" <td>0.3</td>\n",
" <td>8.50</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>2</td>\n",
" <td>1969-12-31 23:59:59</td>\n",
" <td>1969-12-31 23:59:59</td>\n",
" <td>1</td>\n",
" <td>0.02</td>\n",
" <td>2</td>\n",
" <td>-2049400382</td>\n",
" <td>249</td>\n",
" <td>234</td>\n",
" <td>2</td>\n",
" <td>52.0</td>\n",
" <td>0.0</td>\n",
" <td>0.5</td>\n",
" <td>0.00</td>\n",
" <td>0.0</td>\n",
" <td>0.3</td>\n",
" <td>52.80</td>\n",
" </tr>\n",
" </tbody>\n",
"text/plain": [
" VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count \\\n",
"0 1 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n",
"1 1 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n",
"2 1 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n",
"3 1 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n",
"4 2 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n",
" trip_distance RatecodeID store_and_fwd_flag PULocationID DOLocationID \\\n",
"0 3.30 1 -2049400382 263 161 \n",
"1 0.90 1 -2049400382 186 234 \n",
"2 1.10 1 -2049400382 164 161 \n",
"3 1.10 1 -2049400382 236 75 \n",
"4 0.02 2 -2049400382 249 234 \n",
" payment_type fare_amount extra mta_tax tip_amount tolls_amount \\\n",
"0 1 12.5 0.0 0.5 2.00 0.0 \n",
"1 1 5.0 0.0 0.5 1.45 0.0 \n",
"2 1 5.5 0.0 0.5 1.00 0.0 \n",
"3 1 6.0 0.0 0.5 1.70 0.0 \n",
"4 2 52.0 0.0 0.5 0.00 0.0 \n",
" improvement_surcharge total_amount \n",
"0 0.3 15.30 \n",
"1 0.3 7.25 \n",
"2 0.3 7.30 \n",
"3 0.3 8.50 \n",
"4 0.3 52.80 "
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
"source": [
"import dask_cudf\n",
"gdf = dask_cudf.read_csv('data/nyc/many/*.csv')\n",
"cell_type": "markdown",
"metadata": {},
"source": [
"### Time a full-pass computation\n",
"Most of the time here is spent reading data from disk and parsing it."
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 1.16 s, sys: 100 ms, total: 1.26 s\n",
"Wall time: 4.68 s\n"
"data": {
"text/plain": [
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
"source": [
"%time gdf.passenger_count.sum().compute()"
"cell_type": "markdown",
"metadata": {},
"source": [
"### Single GPU"
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 7.5 s, sys: 4.66 s, total: 12.2 s\n",
"Wall time: 10.9 s\n"
"data": {
"text/plain": [
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
"source": [
"%time gdf.passenger_count.sum().compute(scheduler='single-threaded')"
"cell_type": "markdown",
"metadata": {},
"source": [
"### Single CPU"
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
"data": {
"text/plain": [
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
"source": [
"import dask.dataframe as dd\n",
"df = dd.read_csv('data/nyc/many/*.csv')\n",
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 34min 9s, sys: 21.5 s, total: 34min 30s\n",
"Wall time: 3min 14s\n"
"data": {
"text/plain": [
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
"source": [
"%time df.passenger_count.sum().compute(scheduler='single-threaded')"
"cell_type": "markdown",
"metadata": {},
"source": [
"### Eight CPUs, one per process"
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 10.8 s, sys: 988 ms, total: 11.8 s\n",
"Wall time: 57.5 s\n"
"data": {
"text/plain": [
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
"source": [
"%time df.passenger_count.sum().compute()"
"cell_type": "markdown",
"metadata": {},
"source": [
"### Eighty CPUs with a balance of threads and processes"
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
"name": "stderr",
"output_type": "stream",
"text": [
"distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp:// remote=tcp://>\n"
"source": [
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
"data": {
"text/html": [
"<table style=\"border: 2px solid white;\">\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
" <li><b>Scheduler: </b>tcp://\n",
" <li><b>Dashboard: </b><a href='' target='_blank'></a>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
" <li><b>Workers: </b>10</li>\n",
" <li><b>Cores: </b>80</li>\n",
" <li><b>Memory: </b>540.96 GB</li>\n",
"text/plain": [
"<Client: scheduler='tcp://' processes=10 cores=80>"
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
"source": [
"client = Client(n_workers=10, threads_per_worker=8)\n",
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 7.55 s, sys: 692 ms, total: 8.24 s\n",
"Wall time: 34.9 s\n"
"data": {
"text/plain": [
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
"source": [
"%time df.passenger_count.sum().compute()"
"metadata": {
"kernelspec": {
"display_name": "Python [conda env:cudf_dev]",
"language": "python",
"name": "conda-env-cudf_dev-py"
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.5"
"nbformat": 4,
"nbformat_minor": 2