Skip to content

Instantly share code, notes, and snippets.

@kiranchitturi kiranchitturi/stats.md Secret
Last active Jun 4, 2018

Embed
What would you like to do?
Loading nyc taxi data to Solr using spark-solr

Loading NYC taxi data to Solr using spark-solr

Setup

Database setup

  • NYC taxi data downloaded from the Git repo and uploaded to an Amazon RDS instance of Postgres

  • I have downloaded green trip data from 2013 (4 months), 2014, 2015 (https://github.com/toddwschneider/nyc-taxi-data/blob/master/raw_data_urls.txt)

  • The downloaded data is loaded to an RDS instance in AWS. RDS instance details:

    Engine: PostgreSQL 9.4.7
    Instance Class: db.r3.2xlarge
    Storage: 100 GB SSD
    No backups
    
  • The data is uploaded from local to the RDS database (took from 1-2 hours)

  • Total number of rows: 91748362

EC2 Setup

  • 3 EC2 nodes of r3.2xlarge instances

  • 3 api services, 3 Solr instances, 3 Spark workers, 1 Spark Master, 1 Fusion UI, 1 Connector

  • Collection nyc-taxi created with 6 shards, 1 replication

  • Fab commands used

      fab new_ec2_instances:fcloud,n=3,instance_type=r3.2xlarge
      fab new_solrcloud:fcloud,zkn=3,solrJavaMemOpts='-Xms12g -Xmx12g',placement_group=sstk,purpose='scale testing for Fusion 2.3'
      fab fusion_start:fcloud,ui=1
      fab setup_spark_jdbc_jar:fcloud,~/Downloads/postgresql-9.4.1208.jar
      fab setup_spark_conf:fcloud,worker_mem=32g,executor_mem=16g
      fab restart_spark:fcloud
    

Index data from Postgres to Spark using spark-shell script

  • Started spark-shell in Fusion bin with ./bin/spark-shell -m 28g -c 4 -t 12
  • Indexing via DIH showed 20K docs per sec
  • Script used to run the job (Notice the number of partitions and batch_size)
//JDBC script:

// get the max ID from the db to use for partitioning
val getMaxId = sqlContext.jdbc("jdbc:postgresql://taxi-data-20160420.cikmdrscwqru.us-east-1.rds.amazonaws.com:5432/nyc-taxi-data?user=taxidatalogin&password=***", "(select max(id) as maxId from trips) tmp")

// note: lower & upper bounds are not filters
val dbOpts = Map(
"url" -> "jdbc:postgresql://taxi-data-20160420.cikmdrscwqru.us-east-1.rds.amazonaws.com:5432/nyc-taxi-data?user=taxidatalogin&password=***",
"dbtable" -> "trips",
"partitionColumn" -> "id",
"numPartitions" -> "200",
"lowerBound" -> "0",
"upperBound" -> getMaxId.select("maxId").collect()(0)(0).toString,
"fetchSize" -> "5000"
)
var jdbcDF = sqlContext.read.format("jdbc").options(dbOpts).load

// sample for testing
//jdbcDF = jdbcDF.sample(false, 0.01, 5150)

// deal with some data quality issues in the lat/lon cols
jdbcDF = jdbcDF.filter("pickup_latitude >= -90 AND pickup_latitude <= 90 AND pickup_longitude >= -180 AND pickup_longitude <= 180")
jdbcDF = jdbcDF.filter("dropoff_latitude >= -90 AND dropoff_latitude <= 90 AND dropoff_longitude >= -180 AND dropoff_longitude <= 180")

// concat the lat/lon cols into a single value expected by solr location fields
jdbcDF = jdbcDF.withColumn("pickup", concat_ws(",", col("pickup_latitude"),col("pickup_longitude"))).drop("pickup_latitude").drop("pickup_longitude")
jdbcDF = jdbcDF.withColumn("dropoff", concat_ws(",", col("dropoff_latitude"),col("dropoff_longitude"))).drop("dropoff_latitude").drop("dropoff_longitude")
// jdbcDF.printSchema()

jdbcDF.write.format("solr").options(Map("zkhost" -> "ec2-52-90-111-58.compute-1.amazonaws.com:2181,ec2-54-165-229-209.compute-1.amazonaws.com:2181,ec2-54-173-135-227.compute-1.amazonaws.com:2181/fcloud", "collection" -> "nyc_taxi", "batch_size" -> "50000")).mode(org.apache.spark.sql.SaveMode.Overwrite).save

Indexing Stats

  • Total rows 91492956 imported in 49 minutes. I have seen the indexing range from (38-48 minutes) before.
  • Docs per second - 91492956/(2760) = 31120.6.
  • Driver UI after finishing all the tasks

Querying stats

  • Streaming expressions:
    • Script:

      val options = Map(
        "zkhost" -> "ec2-52-90-111-58.compute-1.amazonaws.com:2181,ec2-54-165-229-209.compute-1.amazonaws.com:2181,ec2-54-173-135-227.compute-1.amazonaws.com:2181/fcloud", 
        "collection" -> "nyc_taxi",
        "use_export_handler" -> "true",
        "fields" -> "total_amount,tip_amount,trip_distance,passenger_count,vendor_id",
        "solr.params" -> "sort=vendor_id desc"
      )
      val qDF = sqlContext.read.format("solr").options(options).load
      qDF.registerTempTable("trips")
      val newDF = sqlContext.sql("SELECT avg(total_amount), avg(tip_amount), avg(trip_distance) from trips")

scala> newDF.show() 2016-04-20 22:46:19,326 [main] INFO SolrRelation - Constructed SolrQuery: q=:&rows=1000&fl=total_amount,tip_amount,trip_distance&sort=vendor_id+desc&collection=nyc_taxi +------------------+------------------+-----------------+
| _c0| _c1| _c2| +------------------+------------------+-----------------+ |14.609968263846103|1.1106824575641403|2.935562167650442| +------------------+------------------+-----------------+ ``` * Time taken: * 2.3 minutes (On a fresh index. I ran the streaming first and then the cursorMarks next) * Docs per sec: * 91492956/140 => 653521.11 docs per sec (across 6 tasks) => 108.9k docs per sec (each individual tasks)

  • Cursor Marks:
    • Script:

      val options = Map("zkhost" -> "ec2-52-90-111-58.compute-1.amazonaws.com:2181,ec2-54-165-229-209.compute-1.amazonaws.com:2181,ec2-54-173-135-227.compute-1.amazonaws.com:2181/fcloud", "collection" -> "nyc_taxi", "splits" -> "true", "rows" -> "10000")
      val qDF = sqlContext.read.format("solr").options(options).load
      qDF.registerTempTable("trips")
      val newDF = sqlContext.sql("SELECT avg(total_amount), avg(tip_amount), avg(trip_distance), avg(passenger_count) from trips")
      newDF.show()
      +------------------+------------------+------------------+------------------+
      |               _c0|               _c1|               _c2|               _c3|
      +------------------+------------------+------------------+------------------+
      |14.609968263150824|1.1106824575651437|2.9355621676493673|1.4178521021880635|
      +------------------+------------------+------------------+------------------+
      
    • Time taken:

      • 20 minutes across 120 tasks with a max of 12 tasks in parallel. Since only 12 cores are available to Spark, only a max of 12 tasks can run in parallel (reloaded the collection to get rid of any caches if present)
    • Docs per sec:

      • 91492956/1200 => 76.2K docs per second (across 12 tasks) => 76200/12 => 6350 docs/sec (on avg for each task).

Screenshots:

  • Spark UI after indexing job started Spark UI after job start
  • Driver UI after indexing job finished Driver UI after finishing all the tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.