Skip to content

Instantly share code, notes, and snippets.

@ayushdg
Last active January 13, 2021 20:33
Show Gist options
  • Save ayushdg/8aa1fbbbfa1661cdd302febcda44e0f3 to your computer and use it in GitHub Desktop.
Save ayushdg/8aa1fbbbfa1661cdd302febcda44e0f3 to your computer and use it in GitHub Desktop.
Predicate-Pushdown
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from glob import glob\n",
"import os\n",
"import numpy as np\n",
"import pandas as pd\n",
"file_list = sorted(glob(\"nytaxi.orc/*.orc\"))"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"# Scaling with dask\n",
"from dask_cuda import LocalCUDACluster\n",
"from distributed import Client, wait\n",
"import dask_cudf\n",
"\n",
"cluster = LocalCUDACluster()\n",
"client = Client(cluster)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"### Using cudf wrapped in dask delayed\n",
"from dask import delayed\n",
"import cudf\n",
"\n",
"@delayed\n",
"def read_dataset(file_name):\n",
" df = cudf.read_orc(file_name)\n",
" return df"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Rows read: 146112989\n",
"Rows after filtering: 12228919\n",
"CPU times: user 1.17 s, sys: 137 ms, total: 1.31 s\n",
"Wall time: 3.3 s\n"
]
}
],
"source": [
"%%time\n",
"dfs = [read_dataset(f) for f in file_list]\n",
"ddf = dask_cudf.from_delayed(dfs)\n",
"\n",
"ddf = ddf.persist()\n",
"wait(ddf)\n",
"print(f\"Rows read: {len(ddf)}\")\n",
"ddf = ddf[ddf.tpep_pickup_datetime < np.datetime64(\"2015-01-31\")]\n",
"print(f\"Rows after filtering: {len(ddf)}\")\n",
"\n",
"del(dfs,ddf)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"@delayed\n",
"def read_dataset_filtered(file_name):\n",
" df = cudf.read_orc(file_name, \n",
" filters=[(\"tpep_pickup_datetime\", \"<\", pd.Timestamp(\"2015-01-31 00:00:00\", tz=\"EST\"))])\n",
" return df"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Rows read: 12357474\n",
"Rows after filtering: 12228919\n",
"CPU times: user 1.11 s, sys: 34.6 ms, total: 1.15 s\n",
"Wall time: 1.83 s\n"
]
}
],
"source": [
"%%time\n",
"dfs = [read_dataset_filtered(f) for f in file_list]\n",
"ddf = dask_cudf.from_delayed(dfs)\n",
"\n",
"ddf = ddf.persist()\n",
"wait(ddf)\n",
"print(f\"Rows read: {len(ddf)}\")\n",
"ddf = ddf[ddf.tpep_pickup_datetime < np.datetime64(\"2015-01-31\")]\n",
"print(f\"Rows after filtering: {len(ddf)}\")\n",
"\n",
"del(dfs,ddf)"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Rows read: 12357474\n",
"Rows after filtering: 12228919\n",
"CPU times: user 289 ms, sys: 91.1 ms, total: 380 ms\n",
"Wall time: 1.31 s\n"
]
}
],
"source": [
"%%time\n",
"ddf = dask_cudf.read_orc(\"nytaxi.orc/*.orc\",\n",
" filters=[(\"tpep_pickup_datetime\", \"<\", pd.Timestamp(\"2015-01-31 00:00:00\", tz=\"EST\"))])\n",
"\n",
"\n",
"ddf = ddf.persist()\n",
"wait(ddf)\n",
"print(f\"Rows read: {len(ddf)}\")\n",
"ddf = ddf[ddf.tpep_pickup_datetime < np.datetime64(\"2015-01-31\")]\n",
"print(f\"Rows after filtering: {len(ddf)}\")\n",
"\n",
"del(ddf)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## More complex predicates\n",
"\n",
"Filters are represented in DNF.\n",
"\n",
"Filters -> List[(Tuple)], List[List(Tuple)]\n",
"\n",
"- Each tuple is a predicate for a specific column\n",
"- Tuples within a list are considered to be conjunctions (AND) of predicates \n",
"- A List of tuples are considered to be a disjunction (OR) with other list of tuples.\n",
"\n",
"Eg: `(col1 < A | col1 > B) & col2 < C`\n",
"\n",
"would translate to\n",
"\n",
"\n",
"`[[(\"col1\", \"<\" , A),(\"col2\", \"<\", C)], [(\"col1\", \">\", B), (\"col2\", \"<\", C)]]`"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.8"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment