Last active
January 13, 2021 20:33
-
-
Save ayushdg/8aa1fbbbfa1661cdd302febcda44e0f3 to your computer and use it in GitHub Desktop.
Predicate-Pushdown
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"from glob import glob\n", | |
"import os\n", | |
"import numpy as np\n", | |
"import pandas as pd\n", | |
"file_list = sorted(glob(\"nytaxi.orc/*.orc\"))" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# Scaling with dask\n", | |
"from dask_cuda import LocalCUDACluster\n", | |
"from distributed import Client, wait\n", | |
"import dask_cudf\n", | |
"\n", | |
"cluster = LocalCUDACluster()\n", | |
"client = Client(cluster)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"### Using cudf wrapped in dask delayed\n", | |
"from dask import delayed\n", | |
"import cudf\n", | |
"\n", | |
"@delayed\n", | |
"def read_dataset(file_name):\n", | |
" df = cudf.read_orc(file_name)\n", | |
" return df" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Rows read: 146112989\n", | |
"Rows after filtering: 12228919\n", | |
"CPU times: user 1.17 s, sys: 137 ms, total: 1.31 s\n", | |
"Wall time: 3.3 s\n" | |
] | |
} | |
], | |
"source": [ | |
"%%time\n", | |
"dfs = [read_dataset(f) for f in file_list]\n", | |
"ddf = dask_cudf.from_delayed(dfs)\n", | |
"\n", | |
"ddf = ddf.persist()\n", | |
"wait(ddf)\n", | |
"print(f\"Rows read: {len(ddf)}\")\n", | |
"ddf = ddf[ddf.tpep_pickup_datetime < np.datetime64(\"2015-01-31\")]\n", | |
"print(f\"Rows after filtering: {len(ddf)}\")\n", | |
"\n", | |
"del(dfs,ddf)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"@delayed\n", | |
"def read_dataset_filtered(file_name):\n", | |
" df = cudf.read_orc(file_name, \n", | |
" filters=[(\"tpep_pickup_datetime\", \"<\", pd.Timestamp(\"2015-01-31 00:00:00\", tz=\"EST\"))])\n", | |
" return df" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 8, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Rows read: 12357474\n", | |
"Rows after filtering: 12228919\n", | |
"CPU times: user 1.11 s, sys: 34.6 ms, total: 1.15 s\n", | |
"Wall time: 1.83 s\n" | |
] | |
} | |
], | |
"source": [ | |
"%%time\n", | |
"dfs = [read_dataset_filtered(f) for f in file_list]\n", | |
"ddf = dask_cudf.from_delayed(dfs)\n", | |
"\n", | |
"ddf = ddf.persist()\n", | |
"wait(ddf)\n", | |
"print(f\"Rows read: {len(ddf)}\")\n", | |
"ddf = ddf[ddf.tpep_pickup_datetime < np.datetime64(\"2015-01-31\")]\n", | |
"print(f\"Rows after filtering: {len(ddf)}\")\n", | |
"\n", | |
"del(dfs,ddf)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 10, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Rows read: 12357474\n", | |
"Rows after filtering: 12228919\n", | |
"CPU times: user 289 ms, sys: 91.1 ms, total: 380 ms\n", | |
"Wall time: 1.31 s\n" | |
] | |
} | |
], | |
"source": [ | |
"%%time\n", | |
"ddf = dask_cudf.read_orc(\"nytaxi.orc/*.orc\",\n", | |
" filters=[(\"tpep_pickup_datetime\", \"<\", pd.Timestamp(\"2015-01-31 00:00:00\", tz=\"EST\"))])\n", | |
"\n", | |
"\n", | |
"ddf = ddf.persist()\n", | |
"wait(ddf)\n", | |
"print(f\"Rows read: {len(ddf)}\")\n", | |
"ddf = ddf[ddf.tpep_pickup_datetime < np.datetime64(\"2015-01-31\")]\n", | |
"print(f\"Rows after filtering: {len(ddf)}\")\n", | |
"\n", | |
"del(ddf)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## More complex predicates\n", | |
"\n", | |
"Filters are represented in DNF.\n", | |
"\n", | |
"Filters -> List[(Tuple)], List[List(Tuple)]\n", | |
"\n", | |
"- Each tuple is a predicate for a specific column\n", | |
"- Tuples within a list are considered to be conjunctions (AND) of predicates \n", | |
"- A List of tuples are considered to be a disjunction (OR) with other list of tuples.\n", | |
"\n", | |
"Eg: `(col1 < A | col1 > B) & col2 < C`\n", | |
"\n", | |
"would translate to\n", | |
"\n", | |
"\n", | |
"`[[(\"col1\", \"<\" , A),(\"col2\", \"<\", C)], [(\"col1\", \">\", B), (\"col2\", \"<\", C)]]`" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.7.8" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 4 | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment