Skip to content

Instantly share code, notes, and snippets.

@ayushdg
Last active January 13, 2021 20:33
Show Gist options
  • Save ayushdg/8aa1fbbbfa1661cdd302febcda44e0f3 to your computer and use it in GitHub Desktop.
Save ayushdg/8aa1fbbbfa1661cdd302febcda44e0f3 to your computer and use it in GitHub Desktop.
Predicate-Pushdown
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Parqeut predicate pushdown with cuDF"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Install gcsfs\n",
"!conda install -c conda-forge gcsfs -y"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Pull the nytaxi parquet dataset from Google cloud\n",
"import gcsfs\n",
"\n",
"fs = gcsfs.GCSFileSystem()\n",
"print(\"Downloading Data....\")\n",
"fs.get(\"gcs://anaconda-public-data/nyc-taxi/nyc.parquet\", \"./nytaxi.parquet\", recursive=True)\n",
"print(\"Done!\")"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>tpep_pickup_datetime</th>\n",
" <th>VendorID</th>\n",
" <th>tpep_dropoff_datetime</th>\n",
" <th>passenger_count</th>\n",
" <th>trip_distance</th>\n",
" <th>pickup_longitude</th>\n",
" <th>pickup_latitude</th>\n",
" <th>RateCodeID</th>\n",
" <th>store_and_fwd_flag</th>\n",
" <th>dropoff_longitude</th>\n",
" <th>dropoff_latitude</th>\n",
" <th>payment_type</th>\n",
" <th>fare_amount</th>\n",
" <th>extra</th>\n",
" <th>mta_tax</th>\n",
" <th>tip_amount</th>\n",
" <th>tolls_amount</th>\n",
" <th>improvement_surcharge</th>\n",
" <th>total_amount</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>2015-12-29 05:05:44</td>\n",
" <td>1</td>\n",
" <td>2015-12-29 05:16:11</td>\n",
" <td>1</td>\n",
" <td>4.60</td>\n",
" <td>-73.947357</td>\n",
" <td>40.783512</td>\n",
" <td>1</td>\n",
" <td>N</td>\n",
" <td>-73.989761</td>\n",
" <td>40.725586</td>\n",
" <td>2</td>\n",
" <td>14.5</td>\n",
" <td>0.5</td>\n",
" <td>0.5</td>\n",
" <td>0.0</td>\n",
" <td>0.0</td>\n",
" <td>0.3</td>\n",
" <td>15.8</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>2015-12-29 05:05:48</td>\n",
" <td>2</td>\n",
" <td>2015-12-29 05:16:48</td>\n",
" <td>2</td>\n",
" <td>3.76</td>\n",
" <td>-73.988403</td>\n",
" <td>40.718513</td>\n",
" <td>1</td>\n",
" <td>N</td>\n",
" <td>-73.927780</td>\n",
" <td>40.701859</td>\n",
" <td>2</td>\n",
" <td>13.0</td>\n",
" <td>0.5</td>\n",
" <td>0.5</td>\n",
" <td>0.0</td>\n",
" <td>0.0</td>\n",
" <td>0.3</td>\n",
" <td>14.3</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>2015-12-29 05:05:48</td>\n",
" <td>2</td>\n",
" <td>2015-12-29 05:22:42</td>\n",
" <td>1</td>\n",
" <td>7.49</td>\n",
" <td>-73.996559</td>\n",
" <td>40.725410</td>\n",
" <td>1</td>\n",
" <td>N</td>\n",
" <td>-74.003937</td>\n",
" <td>40.644928</td>\n",
" <td>2</td>\n",
" <td>23.0</td>\n",
" <td>0.5</td>\n",
" <td>0.5</td>\n",
" <td>0.0</td>\n",
" <td>0.0</td>\n",
" <td>0.3</td>\n",
" <td>24.3</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>2015-12-29 05:05:51</td>\n",
" <td>1</td>\n",
" <td>2015-12-29 05:08:03</td>\n",
" <td>1</td>\n",
" <td>0.30</td>\n",
" <td>-73.975304</td>\n",
" <td>40.789639</td>\n",
" <td>1</td>\n",
" <td>N</td>\n",
" <td>-73.972672</td>\n",
" <td>40.793110</td>\n",
" <td>2</td>\n",
" <td>3.5</td>\n",
" <td>0.5</td>\n",
" <td>0.5</td>\n",
" <td>0.0</td>\n",
" <td>0.0</td>\n",
" <td>0.3</td>\n",
" <td>4.8</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>2015-12-29 05:05:57</td>\n",
" <td>1</td>\n",
" <td>2015-12-29 05:35:21</td>\n",
" <td>1</td>\n",
" <td>12.20</td>\n",
" <td>-74.006973</td>\n",
" <td>40.745712</td>\n",
" <td>1</td>\n",
" <td>N</td>\n",
" <td>-74.000404</td>\n",
" <td>40.623508</td>\n",
" <td>2</td>\n",
" <td>36.5</td>\n",
" <td>0.5</td>\n",
" <td>0.5</td>\n",
" <td>0.0</td>\n",
" <td>0.0</td>\n",
" <td>0.3</td>\n",
" <td>37.8</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" tpep_pickup_datetime VendorID tpep_dropoff_datetime passenger_count \\\n",
"0 2015-12-29 05:05:44 1 2015-12-29 05:16:11 1 \n",
"1 2015-12-29 05:05:48 2 2015-12-29 05:16:48 2 \n",
"2 2015-12-29 05:05:48 2 2015-12-29 05:22:42 1 \n",
"3 2015-12-29 05:05:51 1 2015-12-29 05:08:03 1 \n",
"4 2015-12-29 05:05:57 1 2015-12-29 05:35:21 1 \n",
"\n",
" trip_distance pickup_longitude pickup_latitude RateCodeID \\\n",
"0 4.60 -73.947357 40.783512 1 \n",
"1 3.76 -73.988403 40.718513 1 \n",
"2 7.49 -73.996559 40.725410 1 \n",
"3 0.30 -73.975304 40.789639 1 \n",
"4 12.20 -74.006973 40.745712 1 \n",
"\n",
" store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type \\\n",
"0 N -73.989761 40.725586 2 \n",
"1 N -73.927780 40.701859 2 \n",
"2 N -74.003937 40.644928 2 \n",
"3 N -73.972672 40.793110 2 \n",
"4 N -74.000404 40.623508 2 \n",
"\n",
" fare_amount extra mta_tax tip_amount tolls_amount \\\n",
"0 14.5 0.5 0.5 0.0 0.0 \n",
"1 13.0 0.5 0.5 0.0 0.0 \n",
"2 23.0 0.5 0.5 0.0 0.0 \n",
"3 3.5 0.5 0.5 0.0 0.0 \n",
"4 36.5 0.5 0.5 0.0 0.0 \n",
"\n",
" improvement_surcharge total_amount \n",
"0 0.3 15.8 \n",
"1 0.3 14.3 \n",
"2 0.3 24.3 \n",
"3 0.3 4.8 \n",
"4 0.3 37.8 "
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"### Let's read one file and look at the data\n",
"\n",
"import cudf\n",
"\n",
"df = cudf.read_parquet(\"nytaxi.parquet/part.159.parquet\")\n",
"df.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's say we want to get the VendorID and passenger_count for all trips started in the month of january"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"from glob import glob\n",
"import numpy as np\n",
"import pandas as pd\n",
"file_list = sorted(glob(\"nytaxi.parquet/*parquet\"))"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Rows read: 146112989\n",
"Rows after filtering: 12228919\n",
"CPU times: user 982 ms, sys: 498 ms, total: 1.48 s\n",
"Wall time: 1.5 s\n"
]
}
],
"source": [
"%%time\n",
"\n",
"# Without predicate pushdown\n",
"df = cudf.read_parquet(file_list, columns=[\"tpep_pickup_datetime\", \"VendorID\", \"passenger_count\"])\n",
"print(f\"Rows read: {len(df)}\")\n",
"df = df[df['tpep_pickup_datetime'] < np.datetime64(\"2015-01-31\")]\n",
"print(f\"Rows after filtering: {len(df)}\")"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Rows read: 12749062\n",
"Rows after filtering: 12228919\n",
"CPU times: user 318 ms, sys: 110 ms, total: 428 ms\n",
"Wall time: 427 ms\n"
]
}
],
"source": [
"%%time\n",
"\n",
"# With predicate pushdown\n",
"df = cudf.read_parquet(file_list, columns=[\"tpep_pickup_datetime\", \"VendorID\", \"passenger_count\"], filters=[(\"tpep_pickup_datetime\", \"<\", \"2015-01-31\")])\n",
"print(f\"Rows read: {len(df)}\")\n",
"df = df[df['tpep_pickup_datetime'] < np.datetime64(\"2015-01-31\")]\n",
"print(f\"Rows after filtering: {len(df)}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Using dask dataframe predicate pushdowns"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"# Scaling with dask\n",
"from dask_cuda import LocalCUDACluster\n",
"from distributed import Client, wait\n",
"import dask_cudf\n",
"\n",
"cluster = LocalCUDACluster()\n",
"client = Client(cluster)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Rows read: 146112989\n",
"Rows after filtering: 12228919\n",
"CPU times: user 1.41 s, sys: 91.7 ms, total: 1.5 s\n",
"Wall time: 10.6 s\n"
]
}
],
"source": [
"%%time\n",
"\n",
"# Without predicate pushdown\n",
"ddf = dask_cudf.read_parquet(\"nytaxi.parquet\", split_row_groups=False)\n",
"ddf = ddf.persist()\n",
"wait(ddf)\n",
"print(f\"Rows read: {len(ddf)}\")\n",
"\n",
"ddf = ddf[ddf.index < np.datetime64(\"2015-01-31\")]\n",
"ddf = ddf.persist()\n",
"wait(ddf)\n",
"print(f\"Rows after filtering: {len(ddf)}\")"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Rows read: 12749062\n",
"Rows after filtering: 12228919\n",
"CPU times: user 186 ms, sys: 8.77 ms, total: 194 ms\n",
"Wall time: 1.13 s\n"
]
}
],
"source": [
"%%time\n",
"\n",
"# With predicate pushdown\n",
"\n",
"ddf = dask_cudf.read_parquet(\"nytaxi.parquet\", filters=[(\"tpep_pickup_datetime\", \"<\", pd.Timestamp(\"2015-01-31 00:00:00\", tz=\"EST\"))])\n",
"ddf = ddf.persist()\n",
"wait(ddf)\n",
"print(f\"Rows read: {len(ddf)}\")\n",
"\n",
"ddf = ddf[ddf.index < np.datetime64(\"2015-01-31\")]\n",
"ddf = ddf.persist()\n",
"wait(ddf)\n",
"print(f\"Rows after filtering: {len(ddf)}\")"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"### Using cudf wrapped in dask delayed\n",
"from dask import delayed\n",
"import cudf\n",
"\n",
"@delayed\n",
"def read_dataset(file_name):\n",
" df = cudf.read_parquet(file_name)\n",
" return df"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Rows read: 146112989\n",
"Rows after filtering: 12228919\n",
"CPU times: user 1.48 s, sys: 119 ms, total: 1.6 s\n",
"Wall time: 8.3 s\n"
]
}
],
"source": [
"%%time\n",
"dfs = [read_dataset(f) for f in file_list]\n",
"ddf = dask_cudf.from_delayed(dfs)\n",
"\n",
"ddf = ddf.persist()\n",
"wait(ddf)\n",
"print(f\"Rows read: {len(ddf)}\")\n",
"ddf = ddf[ddf.tpep_pickup_datetime < np.datetime64(\"2015-01-31\")]\n",
"print(f\"Rows after filtering: {len(ddf)}\")"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"### Using cudf wrapped in dask delayed\n",
"@delayed\n",
"def read_dataset_filtered(file_name):\n",
" df = cudf.read_parquet(file_name, filters=[(\"tpep_pickup_datetime\", \"<\", \"2015-01-31\")])\n",
" return df"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Rows read: 12749062\n",
"Rows after filtering: 12228919\n",
"CPU times: user 1.13 s, sys: 76.7 ms, total: 1.21 s\n",
"Wall time: 2.6 s\n"
]
}
],
"source": [
"%%time\n",
"dfs = [read_dataset_filtered(f) for f in file_list]\n",
"ddf = dask_cudf.from_delayed(dfs)\n",
"ddf = ddf.persist()\n",
"wait(ddf)\n",
"print(f\"Rows read: {len(ddf)}\")\n",
"ddf = ddf[ddf.tpep_pickup_datetime < np.datetime64(\"2015-01-31\")]\n",
"print(f\"Rows after filtering: {len(ddf)}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.8"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment