Last active
July 17, 2018 04:59
-
-
Save lambiase/74851d17b041244995b96b9a9de824ce to your computer and use it in GitHub Desktop.
New York City Taxi & Limousine Commission (TLC) Trip Data Analysis Using Sparklyr and Google BigQuery
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--- | |
title: "New York City Taxi & Limousine Commission (TLC) Trip Data Analysis Using Sparklyr and Google BigQuery" | |
layout: default | |
published: true | |
category: News | |
parent: news | |
meta_description: | |
tags: | |
- mirai | |
knit: (function(inputFile, encoding) postRmd::render_post(inputFile, "")) | |
params: | |
# gcp_json_keyfile: gcp_keyfile.json | |
bigquery_billing_project_id: <your_billing_project_id> | |
bigquery_gcs_bucket: <your_gcs_bucket> | |
bigquery_dataset_location: US | |
# Tested on Google Cloud Dataproc | |
# see configuration in the Appendix | |
spark_master: yarn-client | |
# spark.* parameters below are used to configure the Spark connection; | |
# https://spark.apache.org/docs/latest/configuration.html | |
# https://spark.apache.org/docs/latest/running-on-yarn.html | |
spark.executor.instances: 4 | |
spark.executor.cores: 8 | |
spark.executor.memory: 34g | |
spark.sql.shuffle.partitions: 1600 | |
spark.network.timeout: 900 | |
spark.yarn.executor.memoryOverhead: 5000 | |
# DO NOT CHANGE unless you know what you are doing | |
spark.hadoop.fs.gs.impl: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem | |
spark.hadoop.fs.AbstractFileSystem.gs.impl: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS | |
--- | |
This post shows how to use [Apache Spark](https://spark.apache.org/) and [Google BigQuery](https://cloud.google.com/bigquery/) in R via [`sparklyr`](https://spark.rstudio.com/) to efficiently analyze a big dataset (NYC yellow taxi trips). | |
<!--more--> | |
You can find the R Markdown document used to generate this post [here](https://goo.gl/KjxxHZ). | |
```{r setup, echo=FALSE, include=FALSE} | |
# Default code chunk setup | |
knitr::opts_chunk$set(echo = TRUE, collapse = TRUE, | |
tidy = FALSE, tidy.opts = list(indent = 2)) | |
render_timing <- function(timing) round(as.numeric(timing$toc - timing$tic) / 60, 1) | |
``` | |
## Introduction | |
On August 3, 2015 the [New York City Taxi & Limousine Commission (TLC)](http://www.nyc.gov/tlc), in partnership with the [New York City Department of Information Technology and Telecommunications (DOITT)](http://www.nyc.gov/doitt), [announced](http://www.nyc.gov/html/tlc/downloads/pdf/press_release_08_03_15.pdf) the availability of millions of trip records from both [Yellow Medallion](https://en.wikipedia.org/wiki/Taxicabs_of_New_York_City) and [Green](https://en.wikipedia.org/wiki/Boro_taxi) (Street Hail Livery) Cabs. The data can be downloaded from the [NYC TLC](http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml) or through the [NYC OpenData](https://opendata.cityofnewyork.us/) website. In addition, the data is available on [Google BigQuery](https://cloud.google.com/bigquery/public-data/nyc-tlc-trips) in the form of public datasets. | |
In this post, we show how to use [Apache Spark](https://spark.apache.org/) and [Google BigQuery](https://cloud.google.com/bigquery/) in R via [`sparklyr`](https://spark.rstudio.com/) to efficiently analyze big datasets. Specifically, we are going to analyze tip amounts paid in taxi trips. | |
The NYC TLC data is **~150GB** and consists of **1.2 billion records**. It contains pick-up/drop-off date/times and locations, number of passengers, tips and fare amounts, etc. of yellow cab trips between 2009 and 2016. | |
## Package Dependencies | |
We are using the `pacman` R package management tool to install and load the package dependencies. | |
Please note that `sparklyr` version `0.7.0+` (available on GitHub, but not yet released on CRAN) is needed. | |
```{r package_dependencies, message=FALSE, warning=FALSE} | |
if(!require(pacman)) { | |
install.packages("pacman") | |
library(pacman) | |
} | |
# From GitHub | |
p_load_gh( | |
"rstudio/sparklyr@master", # version 0.7.0+ is required | |
"miraisolutions/sparkbq@develop", | |
"miraisolutions/sparkgeo@develop" | |
) | |
# From CRAN | |
p_load( | |
"dplyr", | |
"dbplyr", | |
"geojsonio", | |
"leaflet", | |
"knitr", | |
"kableExtra", | |
"tictoc" | |
) | |
``` | |
Here is a list of all the dependencies, together with a short description: | |
* [`pacman`](https://github.com/trinker/pacman): package management tool | |
* [`sparklyr`](https://spark.rstudio.com/): R interface to [Apache Spark](https://spark.apache.org/) | |
* [`sparkbq`](https://github.com/miraisolutions/sparkbq): `sparklyr` extension package to connect to [Google BigQuery](https://cloud.google.com/bigquery/) | |
* [`sparkgeo`](https://github.com/miraisolutions/sparkgeo): `sparklyr` extension package providing geospatial analytics capabilities | |
* [`dplyr`](http://dplyr.tidyverse.org/): grammar for data manipulation | |
* [`dbplyr`](http://dbplyr.tidyverse.org/): database backend for `dplyr` | |
* [`geojsonio`](https://github.com/ropensci/geojsonio): support for [GeoJSON](http://geojson.org/) data | |
* [`leaflet`](https://rstudio.github.io/leaflet/): creation of interactive web maps | |
* [`knitr`](https://yihui.name/knitr/): elegant, flexible, and fast dynamic report generation with R | |
* [`kableExtra`](https://github.com/haozhu233/kableExtra): construct complex tables with 'kable' and pipe syntax | |
* [`tictoc`](https://github.com/collectivemedia/tictoc): functions for timing R scripts | |
## Connect to Apache Spark | |
```{r tic_all, include = FALSE} | |
tic("all") | |
``` | |
Before connecting to [Apache Spark](https://spark.apache.org/) using [`sparklyr`](https://spark.rstudio.com/), | |
we need to tune some [Spark configuration properties](https://spark.apache.org/docs/latest/configuration.html) to allocate the right amount of resources in the cluster and avoid runtime failures: | |
- `spark.executor.instances`: Number of executors to start. An executor is a process launched on a worker node, that runs tasks and keeps data in memory or on disk. | |
- `spark.executor.cores`: Number of cores to use on each executor. | |
- `spark.executor.memory`: Amount of memory to use per executor process. | |
- `spark.sql.shuffle.partitions`: Number of partitions to use when shuffling data for joins or aggregations. | |
- `spark.network.timeout`: Default timeout for all network interactions. | |
- `spark.hadoop.fs.gs.project.id`: Project ID used in GCP. Used to bill for the usage of GCP resources. | |
- `spark.hadoop.fs.gs.impl`: A GCS setting defining the file system in the Spark configuration. | |
- `spark.hadoop.fs.AbstractFileSystem.gs.impl`: A GCS setting defining the abstract file system in the Spark configuration. | |
Furthermore, we also register [`sparkgeo`](https://github.com/miraisolutions/sparkgeo) that will be leveraged later to perform geospatial joins. | |
```{r spark_connect, warning=FALSE} | |
# Spark configuration settings | |
config <- spark_config() | |
# Apply Spark configuration settings from R markdown document parameters | |
spark_param_names <- grep("spark.", names(params), | |
fixed = TRUE, value = TRUE) | |
print(params[spark_param_names]) | |
config[spark_param_names] <- params[spark_param_names] | |
# Set Google Cloud Storage (GCS) settings. This allows us to read | |
# files from 'gs://' URLs. | |
# The Google Cloud Storage connector comes with sparkbq. | |
config[["spark.hadoop.fs.gs.project.id"]] <- | |
params$bigquery_billing_project_id | |
# Check if Google Cloud Platform service credentials have been specified | |
# (in the form of a JSON keyfile) - settings needed when running locally | |
# See https://developers.google.com/identity/protocols/application-default-credentials | |
if(!is.null(params$gcp_json_keyfile)) { | |
Sys.setenv("GOOGLE_APPLICATION_CREDENTIALS" = params$gcp_json_keyfile) | |
config[["spark.hadoop.google.cloud.auth.service.account.json.keyfile"]] <- | |
params$gcp_json_keyfile | |
} | |
# Connect to Apache Spark | |
sc <- spark_connect(master = params$spark_master, | |
config = config, | |
app_name = "NYC TLC") | |
# Register sparkgeo's user-defined functions (UDFs) | |
sparkgeo_register(sc) | |
``` | |
## Google BigQuery Settings | |
Next, we are going to set the Google Cloud Platform project ID to use for [billing](https://cloud.google.com/bigquery/pricing) purposes. [^billing] | |
We also set the Google Cloud Storage (GCS) bucket used to store temporary BigQuery files and the default BigQuery dataset location. | |
[^billing]: Note that BigQuery is priced based on a flat rate for storage and a usage rate for queries. | |
```{r bigquery_settings} | |
bigquery_defaults( | |
billingProjectId = params$bigquery_billing_project_id, | |
gcsBucket = params$bigquery_gcs_bucket, | |
datasetLocation = params$bigquery_dataset_location | |
) | |
``` | |
## Import New York City Neighborhoods | |
```{r tic_data_prep, include = FALSE} | |
tic("data_preparation") | |
``` | |
In the NYC TLC trips dataset we only have pickup and dropoff locations in terms of latitude and longitude. | |
To compute statistics by neighborhood we will need to map these locations to [New York City neighborhoods](https://en.wikipedia.org/wiki/Neighborhoods_in_New_York_City). | |
For this, we are going to leverage [NYC neighborhood polygon information](http://data.beta.nyc/dataset/pediacities-nyc-neighborhoods) in the form of a [GeoJSON](http://geojson.org/) file. We are using `sparkgeo::spark_read_geojson` to read the data straight into Spark and extract any relevant metadata. | |
```{r neighborhoods} | |
neighborhoods <- | |
spark_read_geojson( | |
sc = sc, | |
name = "neighborhoods", | |
path = "gs://miraisolutions/public/sparkgeo/nyc_neighborhoods.geojson" | |
) %>% | |
mutate(neighborhood = metadata_string(metadata, "neighborhood")) %>% | |
select(neighborhood, polygon, index) %>% | |
sdf_persist() | |
``` | |
There are `r neighborhoods %>% sdf_nrow()` neighborhoods. | |
[`sdf_persist`](http://spark.rstudio.com/reference/sdf_persist/) counteracts the default `lazy` behaviour of Spark DataFrames. It forces any pending computation and (optionally) serializes the results to disk and/or memory. | |
## Import NYC TLC trip data from Google BigQuery | |
We are using `sparkbq::spark_read_bigquery` to import the publicly available [Google BigQuery NYC Taxi & Limousine trip dataset](https://cloud.google.com/bigquery/public-data/nyc-tlc-trips). Trip data is available in several tables (one for each year). | |
```{r bigquery_import} | |
all_trips_spark_by_year <- | |
lapply(2009:2016, function(year) { | |
spark_read_bigquery( | |
sc = sc, | |
name = paste0("trips", year), | |
projectId = "bigquery-public-data", | |
datasetId = "new_york", | |
tableId = paste0("tlc_yellow_trips_", year), | |
repartition = 400 | |
) | |
}) | |
# Union of all trip data | |
all_trips_spark <- Reduce(union_all, all_trips_spark_by_year) | |
``` | |
Note: At this point no data (except for schema information) has been read yet. | |
These operations are [lazy](https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#rdd-operations). | |
Due to lack of information on tips paid by cash, we filter the dataset for trips paid by credit card. In addition, we filter out some bad data points such as negative tip & fare amounts and negative trip distances. We also perform a geospatial join to map pickup locations to the corresponding NYC neighborhood and calculate some additional metrics such as trip duration and tip percentage. | |
```{r credit_trips_spark, message=FALSE} | |
credit_trips_spark <- | |
all_trips_spark %>% | |
filter( | |
# Trips paid by credit card | |
payment_type %in% c("CREDIT", "CRD", "1") & | |
# Filter out bad data points | |
fare_amount > 1 & | |
tip_amount >= 0 & | |
trip_distance > 0 & | |
passenger_count > 0 | |
) %>% | |
# Select relevant columns only to reduce amount of data | |
select( | |
vendor_id, | |
pickup_datetime, | |
dropoff_datetime, | |
pickup_latitude, | |
pickup_longitude, | |
trip_distance, | |
passenger_count, | |
fare_amount, | |
tip_amount | |
) %>% | |
# Join with NYC neighborhoods | |
sparkgeo::sdf_spatial_join(neighborhoods, pickup_latitude, | |
pickup_longitude) %>% | |
# NOTE: timestamps are currently returned as microseconds since the epoch | |
mutate( | |
trip_duration = (dropoff_datetime - pickup_datetime) / 1e6, | |
pickup_datetime = from_unixtime(pickup_datetime / 1e6) | |
) %>% | |
mutate( | |
# Split pickup date/time into separate metrics | |
pickup_month = month(pickup_datetime), | |
pickup_weekday = date_format(pickup_datetime, 'EEEE'), | |
pickup_hour = hour(pickup_datetime), | |
# Calculate tip percentage based on fare amount | |
tip_pct = tip_amount / fare_amount * 100 | |
) %>% | |
select( | |
vendor_id, | |
pickup_month, | |
pickup_weekday, | |
pickup_hour, | |
neighborhood, | |
trip_duration, | |
trip_distance, | |
passenger_count, | |
fare_amount, | |
tip_pct | |
) %>% | |
# Persist results to memory and/or disk | |
sdf_persist() | |
``` | |
There are `r format(credit_trips_spark %>% sdf_nrow(), big.mark = "'")` records in `credit_trips_spark`. | |
The above example also showcases the **predicate pushdown** feature of `sparkbq`: the first `filter()` and `select()` operations are pushed down to BigQuery and are executed at data source level. This can improve query performance by reducing the amount of data read from the data source. | |
## Average Tip Percentage by Pickup Neighborhood | |
Here we calculate the average tip percentage by pickup neighborhood for visualization in a following section. Note that the following `dplyr` pipeline is lazy: no computations will be triggered until we execute an appropriate [action](https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#actions). | |
```{r avg_tip_per_neighborhood, message = FALSE} | |
avg_tip_per_neighborhood_spark <- | |
credit_trips_spark %>% | |
group_by(neighborhood) %>% | |
summarize(avg_tip_pct = mean(tip_pct)) %>% | |
arrange(desc(avg_tip_pct)) | |
``` | |
The following Spark SQL code gets generated by `sparklyr`/`dbplyr` for the object `avg_tip_per_neighborhood_spark`: | |
```sql | |
`r avg_tip_per_neighborhood_spark %>% sql_render()` | |
``` | |
This SQL query will be executed by Spark when we run the `collect` [action](https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#actions) in the next section. | |
## Collect and Import to R | |
```{r nyc_tlc_sql, results='asis'} | |
avg_tip_per_neighborhood <- avg_tip_per_neighborhood_spark %>% collect() | |
head(avg_tip_per_neighborhood, n = 10) %>% | |
kable("html") %>% | |
kable_styling(bootstrap_options = "striped", full_width = FALSE) | |
``` | |
`collect` executes the Spark query and returns the results to R. | |
```{r toc_data_prep, include = FALSE} | |
timing_data_prep <- toc() | |
``` | |
## Leaflet Visualization | |
In this section we are going to visualize the average tip percentages using | |
RStudio's [`leaflet`](http://rstudio.github.io/leaflet/) package. | |
The [`geojsonio`](https://github.com/ropensci/geojsonio) package can read | |
GeoJSON files and produces a [`SpatialPolygonsDataFrame`](https://cran.r-project.org/web/packages/sp/vignettes/intro_sp.pdf) | |
that can be directly [plotted](http://rstudio.github.io/leaflet/json.html) by | |
`leaflet` on a map. | |
```{r nyc_tlc_leaflet} | |
# Build SpatialPolygonsDataFrame from NYC neighborhood geojson file | |
nyc_neighborhoods <- | |
geojsonio::geojson_read( | |
"https://storage.googleapis.com/miraisolutions/public/sparkgeo/nyc_neighborhoods.geojson", | |
what = "sp" | |
) | |
# Merge average tip percentages with neighborhood metadata | |
average_tip_with_shapes <- nyc_neighborhoods | |
average_tip_with_shapes@data <- | |
merge( | |
nyc_neighborhoods@data, | |
avg_tip_per_neighborhood, | |
all.x = TRUE | |
) | |
# Create continuous color palette based on average tip percentages | |
pal <- | |
colorNumeric( | |
c("#FF0000", "#FFFF00", "#00FF00"), | |
average_tip_with_shapes$avg_tip_pct | |
) | |
# Draw leaflet containing | |
# - black & white OpenStreetMap map | |
# - NYC neighborhoods drawn as polygons and colored according to the | |
# corresponding tip percentage | |
# - legend for tip percentages | |
average_tip_with_shapes %>% | |
leaflet() %>% | |
addProviderTiles( | |
providers$OpenStreetMap.BlackAndWhite | |
) %>% | |
addPolygons( | |
weight = 1, | |
opacity = 0.7, | |
smoothFactor = 0.3, | |
fillOpacity = 0.7, | |
fillColor = ~ pal(avg_tip_pct), | |
label = ~ paste0(neighborhood, " - ", round(avg_tip_pct, 2), "%"), | |
highlightOptions = highlightOptions( | |
color = "yellow", | |
weight = 2, | |
bringToFront = TRUE | |
) | |
) %>% | |
addLegend( | |
pal = pal, | |
values = ~ avg_tip_pct, | |
opacity = 1, | |
title = "Tips %", | |
labFormat = labelFormat(suffix = ' %') | |
) | |
``` | |
## Machine Learning with `sparklyr` | |
```{r tic_ml, include = FALSE} | |
tic("ml") | |
``` | |
`sparklyr` provides bindings to Spark’s distributed machine learning library [MLlib](https://spark.apache.org/docs/latest/mllib-guide.html). | |
In the following we build a model to predict whether a taxi driver can expect generous tips based on provider, pickup date/time, neighborhood, trip duration & distance, passenger count and fare amount. We define a Spark ML pipeline to compute a random forest classifier. Tip levels are defined as follows: | |
* < 25%: standard | |
* \>= 25%: generous | |
See [Tipping in New York City: a guide to who, when and how much](http://blog.secretescapes.com/2014/09/18/nyc-tipping/) for more information on NYC tipping. Also note the [difference](http://pix11.com/2015/01/07/nyc-cab-software-could-have-you-tipping-too-much/) in how providers handle suggested tips. | |
```{r ml_sparklyr} | |
# Prepare training and test datasets | |
ml_dataset <- | |
credit_trips_spark %>% | |
sdf_partition(train = 0.8, test = 0.2, seed = 2510) | |
# Create machine learning pipeline | |
tip_level_pipeline <- | |
ml_pipeline(sc, uid = "tip_level") %>% | |
# Bucketize tip percentages into levels | |
ft_bucketizer( | |
input_col = "tip_pct", | |
output_col = "tip_level", | |
splits = c(0, 25, Inf), | |
handle_invalid = "skip" | |
) %>% | |
ft_r_formula(formula = tip_level ~ vendor_id + pickup_month + | |
pickup_weekday + pickup_hour + neighborhood + | |
trip_duration + trip_distance + passenger_count + | |
fare_amount) %>% | |
ml_random_forest_classifier(num_trees = 50) | |
# Fit model | |
tip_level_model <- ml_fit(tip_level_pipeline, dataset = ml_dataset$train) | |
# Perform prediction | |
tip_level_prediction <- | |
ml_predict(tip_level_model, dataset = ml_dataset$test, | |
predicted_label_col = "prediction") | |
# Evaluate model by calculating F1 score | |
ml_classification_eval(tip_level_prediction, prediction_col = "prediction", | |
metric_name = "f1") | |
``` | |
```{r toc_ml, include = FALSE} | |
timing_ml <- toc() | |
``` | |
## Disconnect from Apache Spark | |
```{r spark_disconnect} | |
spark_disconnect(sc) | |
``` | |
## Timings | |
```{r toc_all, include = FALSE} | |
timing_all <- toc() | |
``` | |
| Description | Execution time (minutes) | | |
| ---------------------- | -----------------------------------:| | |
| Data preparation | `r render_timing(timing_data_prep)` | | |
| Machine Learning | `r render_timing(timing_ml)` | | |
| Complete Spark session | `r render_timing(timing_all)` | | |
<br/> | |
## Appendix | |
<br/> | |
### Provisioning a Google Dataproc Cluster | |
This section explains how to provision a Google Dataproc cluster for the execution | |
of the TLC Trip Data analyses shown above. | |
#### Install Google Cloud SDK | |
- Register to https://cloud.google.com/ and create a new project (e.g. "my-project"). | |
- Install the Google Cloud SDK by following the instructions at | |
https://cloud.google.com/sdk/docs/quickstart-linux | |
- If you already have gcloud installed, you may want to update your components | |
using `gcloud components update`. | |
#### Log In | |
- Log in to Google Cloud: `gcloud auth login` | |
- Set default gcloud project: `gcloud config set project my-project` | |
#### Provision Dataproc Cluster | |
A Dataproc cluster can easily be provisioned on the command line as follows: | |
``` | |
gcloud dataproc clusters create spark --async --image-version 1.2 \ | |
--master-machine-type n1-standard-1 --master-boot-disk-size 20 \ | |
--worker-machine-type n1-highmem-8 --num-workers 4 \ | |
--worker-boot-disk-size 10 --num-worker-local-ssds 1 | |
``` | |
The command above will launch a Google Cloud Dataproc cluster named `spark`. | |
The master node will be called `spark-m` and the worker nodes `spark-w-0`, | |
`spark-w-1`, etc. | |
#### Dataproc Cluster Parameters | |
- `--async`: display information about the operation in progress, without | |
waiting for the operation to complete. | |
- `--image-version`: version of the bundle including operating system, | |
Big Data components and Google Cloud Platform connectors. See https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-versions | |
- `--master-machine-type`: the type of machine to use for the master. | |
See https://cloud.google.com/compute/docs/machine-types | |
- `--master-boot-disk-size`: the size of the boot disk for the master node. | |
- `--worker-machine-type`: the type of machine to use for workers. See | |
https://cloud.google.com/compute/docs/machine-types | |
- `--num-workers`: the number of worker nodes in the cluster | |
- `--worker-boot-disk-size`: the size of the boot disk for worker nodes | |
- `--num-worker-local-ssds`: the number of local SSDs to attach to each worker | |
See https://cloud.google.com/sdk/gcloud/reference/dataproc/clusters/create | |
for a complete list of all the configuration parameters. | |
#### Google Cloud Machine Types | |
- `n1-standard-1`: Standard machine type with 1 virtual CPU and 3.75 GB of memory. | |
- `n1-highmem-8`: High memory machine type with 8 virtual CPUs and 52 GB of memory. | |
See https://cloud.google.com/compute/docs/machine-types for a complete list of | |
all the standard Google Cloud machine types. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment