Skip to content

Instantly share code, notes, and snippets.

@jorisvandenbossche
Last active May 4, 2020 14:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jorisvandenbossche/db8822b59b3641f12785aff0757e1e11 to your computer and use it in GitHub Desktop.
Save jorisvandenbossche/db8822b59b3641f12785aff0757e1e11 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Demo of a Dask reader for Arrow Datasets"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import dask\n",
"import dask.dataframe as dd\n",
"\n",
"import pyarrow.dataset as ds"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We have a part of the NYC taxi data (2.5 years: July 2016 - 2018; 6GB compressed Parquet files, 24GB in uncompressed Arrow IPC)."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Using Apache Arrow Datasets API"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"import pyarrow.dataset as ds"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"dataset = ds.dataset(\"nyc-taxi-data/dask-partitioned/\", format=\"parquet\", partitioning=\"hive\")"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"vendor_id: string\n",
"pickup_at: timestamp[us]\n",
"dropoff_at: timestamp[us]\n",
"passenger_count: int8\n",
"trip_distance: float\n",
"rate_code_id: string\n",
"store_and_fwd_flag: string\n",
"pickup_location_id: int32\n",
"dropoff_location_id: int32\n",
"payment_type: string\n",
"fare_amount: float\n",
"extra: float\n",
"mta_tax: float\n",
"tip_amount: float\n",
"tolls_amount: float\n",
"improvement_surcharge: float\n",
"total_amount: float\n",
"index: int64\n",
"year: int32\n",
"month: int32\n",
"-- schema metadata --\n",
"pandas: '{\"index_columns\": [], \"column_indexes\": [], \"columns\": [{\"name\":' + 2360\n",
"ARROW:schema: '/////+ANAAAQAAAAAAAKAA4ABgAFAAgACgAAAAABAwAQAAAAAAAKAAwAAA' + 4690\n"
]
}
],
"source": [
"print(dataset.schema.to_string(show_field_metadata=False))"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"table = dataset.to_table(columns=['passenger_count'], filter=(ds.field('year') == 2016) & (ds.field('passenger_count') > 8))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Using it with Dask"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 111 ms, sys: 11.8 ms, total: 123 ms\n",
"Wall time: 135 ms\n"
]
}
],
"source": [
"%%time\n",
"ddf = dd.read_parquet(\"nyc-taxi-data/dask-partitioned/\", engine=\"pyarrow\")"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div><strong>Dask DataFrame Structure:</strong></div>\n",
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>vendor_id</th>\n",
" <th>pickup_at</th>\n",
" <th>dropoff_at</th>\n",
" <th>passenger_count</th>\n",
" <th>trip_distance</th>\n",
" <th>rate_code_id</th>\n",
" <th>store_and_fwd_flag</th>\n",
" <th>pickup_location_id</th>\n",
" <th>dropoff_location_id</th>\n",
" <th>payment_type</th>\n",
" <th>fare_amount</th>\n",
" <th>extra</th>\n",
" <th>mta_tax</th>\n",
" <th>tip_amount</th>\n",
" <th>tolls_amount</th>\n",
" <th>improvement_surcharge</th>\n",
" <th>total_amount</th>\n",
" <th>index</th>\n",
" <th>month</th>\n",
" <th>year</th>\n",
" </tr>\n",
" <tr>\n",
" <th>npartitions=600</th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th></th>\n",
" <td>object</td>\n",
" <td>datetime64[ns]</td>\n",
" <td>datetime64[ns]</td>\n",
" <td>int8</td>\n",
" <td>float32</td>\n",
" <td>object</td>\n",
" <td>object</td>\n",
" <td>int32</td>\n",
" <td>int32</td>\n",
" <td>object</td>\n",
" <td>float32</td>\n",
" <td>float32</td>\n",
" <td>float32</td>\n",
" <td>float32</td>\n",
" <td>float32</td>\n",
" <td>float32</td>\n",
" <td>float32</td>\n",
" <td>int64</td>\n",
" <td>category[known]</td>\n",
" <td>category[known]</td>\n",
" </tr>\n",
" <tr>\n",
" <th></th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>...</th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th></th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th></th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>\n",
"<div>Dask Name: read-parquet, 600 tasks</div>"
],
"text/plain": [
"Dask DataFrame Structure:\n",
" vendor_id pickup_at dropoff_at passenger_count trip_distance rate_code_id store_and_fwd_flag pickup_location_id dropoff_location_id payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount index month year\n",
"npartitions=600 \n",
" object datetime64[ns] datetime64[ns] int8 float32 object object int32 int32 object float32 float32 float32 float32 float32 float32 float32 int64 category[known] category[known]\n",
" ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...\n",
"... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...\n",
" ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...\n",
" ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...\n",
"Dask Name: read-parquet, 600 tasks"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ddf"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Reading the same with Arrow Datasets:"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 9.6 ms, sys: 3.9 ms, total: 13.5 ms\n",
"Wall time: 13.4 ms\n"
]
}
],
"source": [
"%%time\n",
"ddf_arrow = dd.read_arrow_dataset(\"nyc-taxi-data/dask-partitioned/\", partitioning=\"hive\", format=\"parquet\")"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div><strong>Dask DataFrame Structure:</strong></div>\n",
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>vendor_id</th>\n",
" <th>pickup_at</th>\n",
" <th>dropoff_at</th>\n",
" <th>passenger_count</th>\n",
" <th>trip_distance</th>\n",
" <th>rate_code_id</th>\n",
" <th>store_and_fwd_flag</th>\n",
" <th>pickup_location_id</th>\n",
" <th>dropoff_location_id</th>\n",
" <th>payment_type</th>\n",
" <th>fare_amount</th>\n",
" <th>extra</th>\n",
" <th>mta_tax</th>\n",
" <th>tip_amount</th>\n",
" <th>tolls_amount</th>\n",
" <th>improvement_surcharge</th>\n",
" <th>total_amount</th>\n",
" <th>index</th>\n",
" <th>year</th>\n",
" <th>month</th>\n",
" </tr>\n",
" <tr>\n",
" <th>npartitions=600</th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th></th>\n",
" <td>object</td>\n",
" <td>datetime64[ns]</td>\n",
" <td>datetime64[ns]</td>\n",
" <td>int8</td>\n",
" <td>float32</td>\n",
" <td>object</td>\n",
" <td>object</td>\n",
" <td>int32</td>\n",
" <td>int32</td>\n",
" <td>object</td>\n",
" <td>float32</td>\n",
" <td>float32</td>\n",
" <td>float32</td>\n",
" <td>float32</td>\n",
" <td>float32</td>\n",
" <td>float32</td>\n",
" <td>float32</td>\n",
" <td>int64</td>\n",
" <td>int32</td>\n",
" <td>int32</td>\n",
" </tr>\n",
" <tr>\n",
" <th></th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>...</th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th></th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th></th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>\n",
"<div>Dask Name: read-arrow-dataset, 600 tasks</div>"
],
"text/plain": [
"Dask DataFrame Structure:\n",
" vendor_id pickup_at dropoff_at passenger_count trip_distance rate_code_id store_and_fwd_flag pickup_location_id dropoff_location_id payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount index year month\n",
"npartitions=600 \n",
" object datetime64[ns] datetime64[ns] int8 float32 object object int32 int32 object float32 float32 float32 float32 float32 float32 float32 int64 int32 int32\n",
" ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...\n",
"... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...\n",
" ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...\n",
" ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...\n",
"Dask Name: read-arrow-dataset, 600 tasks"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ddf_arrow"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Reading IPC files:"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 6.91 ms, sys: 3.67 ms, total: 10.6 ms\n",
"Wall time: 24 ms\n"
]
}
],
"source": [
"%%time\n",
"ddf_ipc = dd.read_arrow_dataset(\"nyc-taxi-data/dask-partitioned-ipc/\", partitioning=\"hive\", format=\"ipc\")"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div><strong>Dask DataFrame Structure:</strong></div>\n",
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>vendor_id</th>\n",
" <th>pickup_at</th>\n",
" <th>dropoff_at</th>\n",
" <th>passenger_count</th>\n",
" <th>trip_distance</th>\n",
" <th>rate_code_id</th>\n",
" <th>store_and_fwd_flag</th>\n",
" <th>pickup_location_id</th>\n",
" <th>dropoff_location_id</th>\n",
" <th>payment_type</th>\n",
" <th>fare_amount</th>\n",
" <th>extra</th>\n",
" <th>mta_tax</th>\n",
" <th>tip_amount</th>\n",
" <th>tolls_amount</th>\n",
" <th>improvement_surcharge</th>\n",
" <th>total_amount</th>\n",
" <th>index</th>\n",
" <th>year</th>\n",
" <th>month</th>\n",
" </tr>\n",
" <tr>\n",
" <th>npartitions=600</th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th></th>\n",
" <td>object</td>\n",
" <td>datetime64[ns]</td>\n",
" <td>datetime64[ns]</td>\n",
" <td>int8</td>\n",
" <td>float32</td>\n",
" <td>object</td>\n",
" <td>object</td>\n",
" <td>int32</td>\n",
" <td>int32</td>\n",
" <td>object</td>\n",
" <td>float32</td>\n",
" <td>float32</td>\n",
" <td>float32</td>\n",
" <td>float32</td>\n",
" <td>float32</td>\n",
" <td>float32</td>\n",
" <td>float32</td>\n",
" <td>int64</td>\n",
" <td>int32</td>\n",
" <td>int32</td>\n",
" </tr>\n",
" <tr>\n",
" <th></th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>...</th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th></th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th></th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>\n",
"<div>Dask Name: read-arrow-dataset, 600 tasks</div>"
],
"text/plain": [
"Dask DataFrame Structure:\n",
" vendor_id pickup_at dropoff_at passenger_count trip_distance rate_code_id store_and_fwd_flag pickup_location_id dropoff_location_id payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount index year month\n",
"npartitions=600 \n",
" object datetime64[ns] datetime64[ns] int8 float32 object object int32 int32 object float32 float32 float32 float32 float32 float32 float32 int64 int32 int32\n",
" ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...\n",
"... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...\n",
" ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...\n",
" ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...\n",
"Dask Name: read-arrow-dataset, 600 tasks"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ddf_ipc"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**How many rows?** (I am taking here the length of a single column instead of the full dataframe, to avoid loading the full data)"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 12.7 s, sys: 787 ms, total: 13.4 s\n",
"Wall time: 5.68 s\n"
]
},
{
"data": {
"text/plain": [
"278059647"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%time len(ddf.trip_distance)"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 7.38 s, sys: 234 ms, total: 7.62 s\n",
"Wall time: 2.14 s\n"
]
},
{
"data": {
"text/plain": [
"278059647"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%time len(ddf_arrow.trip_distance)"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 1.78 s, sys: 826 ms, total: 2.61 s\n",
"Wall time: 1.86 s\n"
]
},
{
"data": {
"text/plain": [
"278059647"
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%time len(ddf_ipc.trip_distance)"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"5.2 s ± 195 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"%timeit len(ddf.trip_distance)"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"1.98 s ± 55.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"%timeit len(ddf_arrow.trip_distance)"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"1.24 s ± 60 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"%timeit len(ddf_ipc.trip_distance)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Average trip_distance?**"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 14.6 s, sys: 773 ms, total: 15.3 s\n",
"Wall time: 6.4 s\n"
]
},
{
"data": {
"text/plain": [
"3.319460242014189"
]
},
"execution_count": 18,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%time ddf[\"trip_distance\"].mean().compute()"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 9.01 s, sys: 284 ms, total: 9.29 s\n",
"Wall time: 2.64 s\n"
]
},
{
"data": {
"text/plain": [
"3.319460242014189"
]
},
"execution_count": 19,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%time ddf_arrow[\"trip_distance\"].mean().compute()"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 3.54 s, sys: 331 ms, total: 3.87 s\n",
"Wall time: 1.92 s\n"
]
},
{
"data": {
"text/plain": [
"3.319460242014189"
]
},
"execution_count": 20,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%time ddf_ipc[\"trip_distance\"].mean().compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Filtering data"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Applying a filter needs to be done in the `read_parquet` / `read_arrow_dataset` for now:"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Filtering on partition keys"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"ddf_subset = dd.read_parquet(\"nyc-taxi-data/dask-partitioned/\", filters=[[('year', '=', 2017), ('month', '=', 1)]], engine=\"pyarrow\")"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"ddf_arrow_subset = dd.read_arrow_dataset(\"nyc-taxi-data/dask-partitioned/\", partitioning=\"hive\", format=\"parquet\",\n",
" filter=(ds.field('year') == 2017) & (ds.field('month') == 1))"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"ddf_ipc_subset = dd.read_arrow_dataset(\"nyc-taxi-data/dask-partitioned-ipc/\", partitioning=\"hive\", format=\"ipc\",\n",
" filter=(ds.field('year') == 2017) & (ds.field('month') == 1))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Number of rows in this subset:"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"9710124"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"len(ddf_subset.trip_distance)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"9710124"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"len(ddf_arrow_subset.trip_distance)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"9710124"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"len(ddf_ipc_subset.trip_distance)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"925 ms ± 77.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"%timeit len(ddf_subset.trip_distance)"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"76.6 ms ± 2.99 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)\n"
]
}
],
"source": [
"%timeit len(ddf_arrow_subset.trip_distance)"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"48.5 ms ± 4.44 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)\n"
]
}
],
"source": [
"%timeit len(ddf_ipc_subset.trip_distance)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Average trip_distance?"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"2.8138988010091324"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ddf_subset[\"trip_distance\"].mean().compute()"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"2.8138989007761386"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ddf_arrow_subset[\"trip_distance\"].mean().compute()"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"2.8138989007761386"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ddf_ipc_subset[\"trip_distance\"].mean().compute()"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"1.08 s ± 158 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"%timeit ddf_subset[\"trip_distance\"].mean().compute()"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"110 ms ± 10.9 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)\n"
]
}
],
"source": [
"%timeit ddf_arrow_subset[\"trip_distance\"].mean().compute()"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"76.7 ms ± 7.17 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)\n"
]
}
],
"source": [
"%timeit ddf_ipc_subset[\"trip_distance\"].mean().compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Filtering on a column"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To filter on a non-partition key column, we need to specify to gather the statistics in the dask version (in the arrow dataset version, this is done automatically):"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 18.4 s, sys: 283 ms, total: 18.7 s\n",
"Wall time: 18.9 s\n"
]
}
],
"source": [
"%%time\n",
"ddf_subset = dd.read_parquet(\"nyc-taxi-data/dask-partitioned/\", engine=\"pyarrow\", filters=[[('passenger_count', '>', 8)]], gather_statistics=True)"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 11.1 ms, sys: 0 ns, total: 11.1 ms\n",
"Wall time: 10.3 ms\n"
]
}
],
"source": [
"%%time\n",
"ddf_arrow_subset = dd.read_arrow_dataset(\"nyc-taxi-data/dask-partitioned/\", partitioning=\"hive\", format=\"parquet\",\n",
" filter=ds.field('passenger_count') > 8)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In the dask version, we also need to apply the same filtering step afterwards again, as the filtering when reading only filters per parquet row group, while the arrow version will do an additional filter step per row so it actually gives the fully filtered result:"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 8.02 s, sys: 623 ms, total: 8.64 s\n",
"Wall time: 5.64 s\n"
]
},
{
"data": {
"text/plain": [
"6.097070407833327"
]
},
"execution_count": 19,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time\n",
"ddf_subset[['trip_distance', 'passenger_count']][ddf_subset['passenger_count'] > 8]['trip_distance'].mean().compute()"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 4.55 s, sys: 139 ms, total: 4.69 s\n",
"Wall time: 1.76 s\n"
]
},
{
"data": {
"text/plain": [
"6.097070402308354"
]
},
"execution_count": 20,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time\n",
"ddf_arrow_subset['trip_distance'].mean().compute()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python (arrow-dev)",
"language": "python",
"name": "arrow-dev"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment