-
NYC taxi data downloaded from the Git repo and uploaded to an Amazon RDS instance of Postgres
-
I have downloaded green trip data from 2013 (4 months), 2014, 2015 (https://github.com/toddwschneider/nyc-taxi-data/blob/master/raw_data_urls.txt)
-
The downloaded data is loaded to an RDS instance in AWS. RDS instance details:
Engine: PostgreSQL 9.4.7 Instance Class: db.r3.2xlarge Storage: 100 GB SSD No backups
-
The data is uploaded from local to the RDS database (took from 1-2 hours)
-
Total number of rows:
91748362
-
3 EC2 nodes of r3.2xlarge instances
-
3 api services, 3 Solr instances, 3 Spark workers, 1 Spark Master, 1 Fusion UI, 1 Connector
-
Collection
nyc-taxi
created with 6 shards, 1 replication -
Fab commands used
fab new_ec2_instances:fcloud,n=3,instance_type=r3.2xlarge fab new_solrcloud:fcloud,zkn=3,solrJavaMemOpts='-Xms12g -Xmx12g',placement_group=sstk,purpose='scale testing for Fusion 2.3' fab fusion_start:fcloud,ui=1 fab setup_spark_jdbc_jar:fcloud,~/Downloads/postgresql-9.4.1208.jar fab setup_spark_conf:fcloud,worker_mem=32g,executor_mem=16g fab restart_spark:fcloud
- Started spark-shell in Fusion
bin
with./bin/spark-shell -m 28g -c 4 -t 12
- Indexing via DIH showed 20K docs per sec
- Script used to run the job (Notice the number of partitions and batch_size)
//JDBC script:
// get the max ID from the db to use for partitioning
val getMaxId = sqlContext.jdbc("jdbc:postgresql://taxi-data-20160420.cikmdrscwqru.us-east-1.rds.amazonaws.com:5432/nyc-taxi-data?user=taxidatalogin&password=***", "(select max(id) as maxId from trips) tmp")
// note: lower & upper bounds are not filters
val dbOpts = Map(
"url" -> "jdbc:postgresql://taxi-data-20160420.cikmdrscwqru.us-east-1.rds.amazonaws.com:5432/nyc-taxi-data?user=taxidatalogin&password=***",
"dbtable" -> "trips",
"partitionColumn" -> "id",
"numPartitions" -> "200",
"lowerBound" -> "0",
"upperBound" -> getMaxId.select("maxId").collect()(0)(0).toString,
"fetchSize" -> "5000"
)
var jdbcDF = sqlContext.read.format("jdbc").options(dbOpts).load
// sample for testing
//jdbcDF = jdbcDF.sample(false, 0.01, 5150)
// deal with some data quality issues in the lat/lon cols
jdbcDF = jdbcDF.filter("pickup_latitude >= -90 AND pickup_latitude <= 90 AND pickup_longitude >= -180 AND pickup_longitude <= 180")
jdbcDF = jdbcDF.filter("dropoff_latitude >= -90 AND dropoff_latitude <= 90 AND dropoff_longitude >= -180 AND dropoff_longitude <= 180")
// concat the lat/lon cols into a single value expected by solr location fields
jdbcDF = jdbcDF.withColumn("pickup", concat_ws(",", col("pickup_latitude"),col("pickup_longitude"))).drop("pickup_latitude").drop("pickup_longitude")
jdbcDF = jdbcDF.withColumn("dropoff", concat_ws(",", col("dropoff_latitude"),col("dropoff_longitude"))).drop("dropoff_latitude").drop("dropoff_longitude")
// jdbcDF.printSchema()
jdbcDF.write.format("solr").options(Map("zkhost" -> "ec2-52-90-111-58.compute-1.amazonaws.com:2181,ec2-54-165-229-209.compute-1.amazonaws.com:2181,ec2-54-173-135-227.compute-1.amazonaws.com:2181/fcloud", "collection" -> "nyc_taxi", "batch_size" -> "50000")).mode(org.apache.spark.sql.SaveMode.Overwrite).save
- Total rows
91492956
imported in 49 minutes. I have seen the indexing range from (38-48 minutes) before. - Docs per second - 91492956/(2760) = 31120.6.
- Driver UI after finishing all the tasks
- Streaming expressions:
-
Script:
val options = Map( "zkhost" -> "ec2-52-90-111-58.compute-1.amazonaws.com:2181,ec2-54-165-229-209.compute-1.amazonaws.com:2181,ec2-54-173-135-227.compute-1.amazonaws.com:2181/fcloud", "collection" -> "nyc_taxi", "use_export_handler" -> "true", "fields" -> "total_amount,tip_amount,trip_distance,passenger_count,vendor_id", "solr.params" -> "sort=vendor_id desc" ) val qDF = sqlContext.read.format("solr").options(options).load qDF.registerTempTable("trips") val newDF = sqlContext.sql("SELECT avg(total_amount), avg(tip_amount), avg(trip_distance) from trips")
-
scala> newDF.show()
2016-04-20 22:46:19,326 [main] INFO SolrRelation - Constructed SolrQuery: q=:&rows=1000&fl=total_amount,tip_amount,trip_distance&sort=vendor_id+desc&collection=nyc_taxi
+------------------+------------------+-----------------+
| _c0| _c1| _c2|
+------------------+------------------+-----------------+
|14.609968263846103|1.1106824575641403|2.935562167650442|
+------------------+------------------+-----------------+
```
* Time taken:
* 2.3 minutes (On a fresh index. I ran the streaming first and then the cursorMarks next)
* Docs per sec:
* 91492956/140 => 653521.11 docs per sec (across 6 tasks) => 108.9k docs per sec (each individual tasks)
- Cursor Marks:
-
Script:
val options = Map("zkhost" -> "ec2-52-90-111-58.compute-1.amazonaws.com:2181,ec2-54-165-229-209.compute-1.amazonaws.com:2181,ec2-54-173-135-227.compute-1.amazonaws.com:2181/fcloud", "collection" -> "nyc_taxi", "splits" -> "true", "rows" -> "10000") val qDF = sqlContext.read.format("solr").options(options).load qDF.registerTempTable("trips") val newDF = sqlContext.sql("SELECT avg(total_amount), avg(tip_amount), avg(trip_distance), avg(passenger_count) from trips") newDF.show() +------------------+------------------+------------------+------------------+ | _c0| _c1| _c2| _c3| +------------------+------------------+------------------+------------------+ |14.609968263150824|1.1106824575651437|2.9355621676493673|1.4178521021880635| +------------------+------------------+------------------+------------------+
-
Time taken:
- 20 minutes across 120 tasks with a max of 12 tasks in parallel. Since only 12 cores are available to Spark, only a max of 12 tasks can run in parallel (reloaded the collection to get rid of any caches if present)
-
Docs per sec:
- 91492956/1200 => 76.2K docs per second (across 12 tasks) => 76200/12 => 6350 docs/sec (on avg for each task).
-