Skip to content

Instantly share code, notes, and snippets.

Last active February 1, 2021 22:13
Show Gist options
  • Save ayushdg/f3d96ede8c3bcfb55482148aa01750b3 to your computer and use it in GitHub Desktop.
Save ayushdg/f3d96ede8c3bcfb55482148aa01750b3 to your computer and use it in GitHub Desktop.
Predicate Filtering on Apache Parquet & ORC with cuDF, dask-cuDF
Display the source blob
Display the rendered blob
"cells": [
"cell_type": "markdown",
"metadata": {},
"source": [
"### Filtered Reading with cuDF and dask-cuDF"
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Install gcsfs\n",
"!conda install -c conda-forge gcsfs -y"
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"nyt_parq_path = \"./nytaxi.parquet\""
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Pull the nytaxi parquet dataset from a Public Google cloud bucket hosted by anaconda\n",
"import gcsfs\n",
"fs = gcsfs.GCSFileSystem()\n",
"print(\"Downloading Data....\")\n",
"fs.get(\"gcs://anaconda-public-data/nyc-taxi/nyc.parquet\", nyt_parq_path, recursive=True)\n",
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
"data": {
"text/html": [
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>tpep_pickup_datetime</th>\n",
" <th>VendorID</th>\n",
" <th>tpep_dropoff_datetime</th>\n",
" <th>passenger_count</th>\n",
" <th>trip_distance</th>\n",
" <th>pickup_longitude</th>\n",
" <th>pickup_latitude</th>\n",
" <th>RateCodeID</th>\n",
" <th>store_and_fwd_flag</th>\n",
" <th>dropoff_longitude</th>\n",
" <th>dropoff_latitude</th>\n",
" <th>payment_type</th>\n",
" <th>fare_amount</th>\n",
" <th>extra</th>\n",
" <th>mta_tax</th>\n",
" <th>tip_amount</th>\n",
" <th>tolls_amount</th>\n",
" <th>improvement_surcharge</th>\n",
" <th>total_amount</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>2015-01-01 00:00:00</td>\n",
" <td>1</td>\n",
" <td>2015-01-01 00:11:26</td>\n",
" <td>5</td>\n",
" <td>4.00</td>\n",
" <td>-73.971436</td>\n",
" <td>40.760201</td>\n",
" <td>1</td>\n",
" <td>N</td>\n",
" <td>-73.921181</td>\n",
" <td>40.768269</td>\n",
" <td>2</td>\n",
" <td>13.5</td>\n",
" <td>0.5</td>\n",
" <td>0.5</td>\n",
" <td>0.0</td>\n",
" <td>0.0</td>\n",
" <td>0.0</td>\n",
" <td>14.5</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>2015-01-01 00:00:00</td>\n",
" <td>2</td>\n",
" <td>2015-01-01 00:00:00</td>\n",
" <td>1</td>\n",
" <td>1.68</td>\n",
" <td>-73.991547</td>\n",
" <td>40.750069</td>\n",
" <td>1</td>\n",
" <td>N</td>\n",
" <td>0.000000</td>\n",
" <td>0.000000</td>\n",
" <td>2</td>\n",
" <td>10.0</td>\n",
" <td>0.0</td>\n",
" <td>0.5</td>\n",
" <td>0.0</td>\n",
" <td>0.0</td>\n",
" <td>0.3</td>\n",
" <td>10.8</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>2015-01-01 00:00:00</td>\n",
" <td>2</td>\n",
" <td>2015-01-01 00:00:00</td>\n",
" <td>3</td>\n",
" <td>1.56</td>\n",
" <td>-74.001320</td>\n",
" <td>40.729057</td>\n",
" <td>1</td>\n",
" <td>N</td>\n",
" <td>-74.010208</td>\n",
" <td>40.719662</td>\n",
" <td>1</td>\n",
" <td>7.5</td>\n",
" <td>0.5</td>\n",
" <td>0.5</td>\n",
" <td>0.0</td>\n",
" <td>0.0</td>\n",
" <td>0.3</td>\n",
" <td>8.8</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>2015-01-01 00:00:01</td>\n",
" <td>1</td>\n",
" <td>2015-01-01 00:03:49</td>\n",
" <td>1</td>\n",
" <td>0.80</td>\n",
" <td>-73.860847</td>\n",
" <td>40.757294</td>\n",
" <td>1</td>\n",
" <td>N</td>\n",
" <td>-73.868111</td>\n",
" <td>40.752285</td>\n",
" <td>2</td>\n",
" <td>5.0</td>\n",
" <td>0.5</td>\n",
" <td>0.5</td>\n",
" <td>0.0</td>\n",
" <td>0.0</td>\n",
" <td>0.0</td>\n",
" <td>6.3</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>2015-01-01 00:00:03</td>\n",
" <td>2</td>\n",
" <td>2015-01-01 00:21:48</td>\n",
" <td>2</td>\n",
" <td>2.57</td>\n",
" <td>-73.969017</td>\n",
" <td>40.754269</td>\n",
" <td>1</td>\n",
" <td>N</td>\n",
" <td>-73.994133</td>\n",
" <td>40.761600</td>\n",
" <td>2</td>\n",
" <td>14.5</td>\n",
" <td>0.5</td>\n",
" <td>0.5</td>\n",
" <td>0.0</td>\n",
" <td>0.0</td>\n",
" <td>0.3</td>\n",
" <td>15.8</td>\n",
" </tr>\n",
" </tbody>\n",
"text/plain": [
" tpep_pickup_datetime VendorID tpep_dropoff_datetime passenger_count \\\n",
"0 2015-01-01 00:00:00 1 2015-01-01 00:11:26 5 \n",
"1 2015-01-01 00:00:00 2 2015-01-01 00:00:00 1 \n",
"2 2015-01-01 00:00:00 2 2015-01-01 00:00:00 3 \n",
"3 2015-01-01 00:00:01 1 2015-01-01 00:03:49 1 \n",
"4 2015-01-01 00:00:03 2 2015-01-01 00:21:48 2 \n",
" trip_distance pickup_longitude pickup_latitude RateCodeID \\\n",
"0 4.00 -73.971436 40.760201 1 \n",
"1 1.68 -73.991547 40.750069 1 \n",
"2 1.56 -74.001320 40.729057 1 \n",
"3 0.80 -73.860847 40.757294 1 \n",
"4 2.57 -73.969017 40.754269 1 \n",
" store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type \\\n",
"0 N -73.921181 40.768269 2 \n",
"1 N 0.000000 0.000000 2 \n",
"2 N -74.010208 40.719662 1 \n",
"3 N -73.868111 40.752285 2 \n",
"4 N -73.994133 40.761600 2 \n",
" fare_amount extra mta_tax tip_amount tolls_amount \\\n",
"0 13.5 0.5 0.5 0.0 0.0 \n",
"1 10.0 0.0 0.5 0.0 0.0 \n",
"2 7.5 0.5 0.5 0.0 0.0 \n",
"3 5.0 0.5 0.5 0.0 0.0 \n",
"4 14.5 0.5 0.5 0.0 0.0 \n",
" improvement_surcharge total_amount \n",
"0 0.0 14.5 \n",
"1 0.3 10.8 \n",
"2 0.3 8.8 \n",
"3 0.0 6.3 \n",
"4 0.3 15.8 "
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
"source": [
"### Let's read one file and look at the data\n",
"import os\n",
"import cudf\n",
"df = cudf.read_parquet(os.path.join(nyt_parq_path, \"part.0.parquet\"))\n",
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's say we want to get the VendorID and passenger_count for all trips started in the month of january"
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"from glob import glob\n",
"import numpy as np\n",
"import pandas as pd\n",
"date = pd.Timestamp(\"2015-02-01 00:00:00\")"
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's say we want to get the `VendorID` and `passenger_count` for all trips started in the month of January.\n",
"Approach 1: Read specific columns and Filter"
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
"name": "stdout",
"output_type": "stream",
"text": [
"Rows read: 146112989\n",
"Rows after filtering: 12748986\n",
"CPU times: user 937 ms, sys: 612 ms, total: 1.55 s\n",
"Wall time: 1.56 s\n"
"source": [
"# Without predicate pushdown\n",
"df = cudf.read_parquet(nyt_parq_path, columns=[\"tpep_pickup_datetime\", \"VendorID\", \"passenger_count\"])\n",
"print(f\"Rows read: {len(df)}\")\n",
"df = df[df['tpep_pickup_datetime'] < date]\n",
"print(f\"Rows after filtering: {len(df)}\")"
"cell_type": "markdown",
"metadata": {},
"source": [
"Peak Memory Usage: ~8000 MB\n",
"Approach 2: Filtered Reading using Predicates - The values in `tpep_pickup_datetime` are not completely sorted but roughly ordered by time. This ordering makes it a good candidate for statistics based filtering as row chunks may be skipped wherever the requested range lies outside the statistics of that chunk.\n",
"Note: Using the `filters` argument allows skipping row_groups which don't match the filter. There might still be row_groups read in that have the data to be filtered."
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
"name": "stdout",
"output_type": "stream",
"text": [
"Rows read: 12749062\n",
"Rows after filtering: 12748986\n",
"CPU times: user 326 ms, sys: 121 ms, total: 447 ms\n",
"Wall time: 445 ms\n"
"source": [
"# With predicate pushdown\n",
"df = cudf.read_parquet(nyt_parq_path, \n",
" columns=[\"tpep_pickup_datetime\", \"VendorID\", \"passenger_count\"], \n",
" filters=[(\"tpep_pickup_datetime\", \"<\", date)])\n",
"print(f\"Rows read: {len(df)}\")\n",
"df = df[df['tpep_pickup_datetime'] < date]\n",
"print(f\"Rows after filtering: {len(df)}\")"
"cell_type": "markdown",
"metadata": {},
"source": [
"Peak Memory Usage: \\~1800 MB\n",
"In this case the amount of rows read with Predicate based fitlering was \\~12.7 Million rows, 11x lesser than the \\~146 Million rows in the dataset. The subsequent filtering operation (at the the dataframe level), only filters a few rows (\\~1000) for values outside the predicate condition present in the row_chunks read in."
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Using Predicate Filters with dask-cuDF"
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
"data": {
"text/html": [
"<table style=\"border: 2px solid white;\">\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3 style=\"text-align: left;\">Client</h3>\n",
"<ul style=\"text-align: left; list-style: none; margin: 0; padding: 0;\">\n",
" <li><b>Scheduler: </b>tcp://</li>\n",
" <li><b>Dashboard: </b><a href='' target='_blank'></a></li>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3 style=\"text-align: left;\">Cluster</h3>\n",
"<ul style=\"text-align: left; list-style:none; margin: 0; padding: 0;\">\n",
" <li><b>Workers: </b>4</li>\n",
" <li><b>Cores: </b>4</li>\n",
" <li><b>Memory: </b>404.32 GB</li>\n",
"text/plain": [
"<Client: 'tcp://' processes=4 threads=4, memory=404.32 GB>"
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
"source": [
"# Startup a cluster\n",
"from dask_cuda import LocalCUDACluster\n",
"from distributed import Client, wait\n",
"import dask_cudf\n",
"cluster = LocalCUDACluster()\n",
"client = Client(cluster)\n",
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
"name": "stdout",
"output_type": "stream",
"text": [
"Rows read: 146112989\n",
"Rows after filtering: 12748986\n",
"CPU times: user 1.17 s, sys: 165 ms, total: 1.34 s\n",
"Wall time: 8.81 s\n"
"source": [
"# Without predicate pushdown\n",
"ddf = dask_cudf.read_parquet(nyt_parq_path,\n",
" split_row_groups=False, \n",
" index=False)\n",
"ddf = ddf.persist()\n",
"print(f\"Rows read: {len(ddf)}\")\n",
"ddf = ddf[ddf.tpep_pickup_datetime < date]\n",
"ddf = ddf.persist()\n",
"print(f\"Rows after filtering: {len(ddf)}\")"
"cell_type": "markdown",
"metadata": {},
"source": [
"Peak Memory Usage: ~6800 MB/GPU (4 GPUs total)"
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
"name": "stdout",
"output_type": "stream",
"text": [
"Rows read: 12749062\n",
"Rows after filtering: 12748986\n",
"CPU times: user 197 ms, sys: 15.8 ms, total: 213 ms\n",
"Wall time: 1.2 s\n"
"source": [
"# With predicate pushdown\n",
"ddf = dask_cudf.read_parquet(nyt_parq_path, \n",
" filters=[(\"tpep_pickup_datetime\", \"<\", pd.Timestamp(\"2015-02-01 00:00:00\", tz=\"UTC\"))], \n",
" index=False,\n",
" split_row_groups=False)\n",
"ddf = ddf.persist()\n",
"print(f\"Rows read: {len(ddf)}\")\n",
"ddf = ddf[ddf.tpep_pickup_datetime < date]\n",
"ddf = ddf.persist()\n",
"print(f\"Rows after filtering: {len(ddf)}\")"
"cell_type": "markdown",
"metadata": {},
"source": [
"Peak Memory Usage: ~1400 MB/GPU (4 GPUs total)\n",
"Writing the dataset to Apache ORC format\n"
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"nyt_orc_path = \"./nytaxi.orc\""
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
"name": "stdout",
"output_type": "stream",
"text": [
"source": [
"ddf = dask_cudf.read_parquet(nyt_parq_path, \n",
" index=False,\n",
" split_row_groups=False)\n",
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
"name": "stdout",
"output_type": "stream",
"text": [
"Rows read: 146112989\n",
"Rows after filtering: 12748986\n",
"CPU times: user 2.58 s, sys: 242 ms, total: 2.83 s\n",
"Wall time: 12.2 s\n"
"source": [
"# Without predicate pushdown\n",
"ddf = dask_cudf.read_orc(os.path.join(nyt_orc_path,\"*.orc\"))\n",
"ddf = ddf.persist()\n",
"print(f\"Rows read: {len(ddf)}\")\n",
"ddf = ddf[ddf.tpep_pickup_datetime < date]\n",
"ddf = ddf.persist()\n",
"print(f\"Rows after filtering: {len(ddf)}\")"
"cell_type": "markdown",
"metadata": {},
"source": [
"Predicate filtering with Apache ORC files"
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
"name": "stdout",
"output_type": "stream",
"text": [
"Rows read: 12749062\n",
"Rows after filtering: 12748986\n",
"CPU times: user 383 ms, sys: 113 ms, total: 495 ms\n",
"Wall time: 1.52 s\n"
"source": [
"# Without predicate pushdown\n",
"ddf = dask_cudf.read_orc(os.path.join(nyt_orc_path,\"*.orc\"), \n",
" filters=[(\"tpep_pickup_datetime\", \"<\", pd.Timestamp(\"2015-02-01 00:00:00\", tz=\"UTC\"))]\n",
" )\n",
"ddf = ddf.persist()\n",
"print(f\"Rows read: {len(ddf)}\")\n",
"ddf = ddf[ddf.tpep_pickup_datetime < date]\n",
"ddf = ddf.persist()\n",
"print(f\"Rows after filtering: {len(ddf)}\")"
"cell_type": "markdown",
"metadata": {},
"source": [
"### More complex predicates\n",
"Filters are represented in DNF.\n",
"Filters -> List[(Tuple)], List[List(Tuple)]\n",
"- Each tuple is a predicate for a specific column\n",
"- Tuples within a list are considered to be conjunctions (AND) of predicates \n",
"- A List of tuples are considered to be a disjunction (OR) with other list of tuples.\n",
"Eg: \n",
"When selecting all trips in the month of January and December with less than 3 passengers could be represented as follows \n",
"(tpep_pickup_datetime < 2015-02-01 | tpep_pickup_datetime >= 2015-12-01) & (passenger_count < 3)\n",
"In this case distributing the conjunction over the disjunctions we get the DNF:\n",
"((tpep_pickup_datetime < 2015-02-01) & (passenger_count < 3)) | \n",
"((tpep_pickup_datetime >= 2015-12-01) & (passenger_count < 3))\n",
"would translate to the following in code:"
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
"name": "stdout",
"output_type": "stream",
"text": [
"Rows read: 24253006\n",
"Rows after filtering: 20523212\n",
"CPU times: user 818 ms, sys: 172 ms, total: 990 ms\n",
"Wall time: 2.76 s\n"
"source": [
"date1 = pd.Timestamp(\"2015-02-01 00:00:00\", tz=\"UTC\")\n",
"date2 = pd.Timestamp(\"2015-12-01 00:00:00\", tz=\"UTC\")\n",
"passenger_filter = 3\n",
"filters = [\n",
" [(\"tpep_pickup_datetime\", \"<\" , date1),(\"passenger_count\", \"<\", passenger_filter)], \n",
" [(\"tpep_pickup_datetime\", \">=\", date2), (\"passenger_count\", \"<\", passenger_filter)]\n",
" ]\n",
"# Without predicate pushdown\n",
"ddf = dask_cudf.read_orc(os.path.join(nyt_orc_path,\"*.orc\"), \n",
" filters=filters,\n",
" )\n",
"ddf = ddf.persist()\n",
"print(f\"Rows read: {len(ddf)}\")\n",
"ddf = ddf[((ddf.tpep_pickup_datetime < date1) | (ddf.tpep_pickup_datetime >= date2)) & (ddf.passenger_count < 3)]\n",
"ddf = ddf.persist()\n",
"print(f\"Rows after filtering: {len(ddf)}\")"
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.6"
"nbformat": 4,
"nbformat_minor": 4
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment