Skip to content

Instantly share code, notes, and snippets.

@epugh
Last active October 9, 2018 03:32
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save epugh/7198d296757ce3c32ae75e3833d6c526 to your computer and use it in GitHub Desktop.
Save epugh/7198d296757ce3c32ae75e3833d6c526 to your computer and use it in GitHub Desktop.

Future of Big Data: Philadelphia

These are notes for following along on the talk I am giving.

This builds on the gist: https://gist.github.com/epugh/5729071c3b8aab81636d422c391aa716, but is meant to be stand alone!

  1. This gist is using the latest version of Zeppelin. Replace the ip address 192.168.99.100 with the your docker machine ip. Get it by running docker-machine ip.
  2. Fire up Zeppelin + Spark Master and a Spark Worker via: docker run -d --name zeppelin -p 8080:8080 dylanmei/zeppelin
  3. If it doesnt' work, go back to the specific "stable" version of Zeppelin. There is a 1 GB layer in there, watch out!
docker run -d --name zeppelin -p 8080:8080 dylanmei/zeppelin:0.6.0-stable

docker exec -it zeppelin wget http://central.maven.org/maven2/com/lucidworks/spark/spark-solr/2.0.1/spark-solr-2.0.1.jar
docker exec -it zeppelin wget http://central.maven.org/maven2/org/elasticsearch/elasticsearch-hadoop/2.3.2/elasticsearch-hadoop-2.3.2.jar

%dep
z.reset()
z.addRepo("Spark Packages Repo").url("http://dl.bintray.com/spark-packages/maven")
z.load("com.databricks:spark-csv_2.10:1.4.0")
z.load("/zeppelin/spark-solr-2.0.1.jar")
z.load("/zeppelin/elasticsearch-hadoop-2.3.2.jar")

  1. Exercise 1: Let's treat Solr as a relational database!

    Walk through connecting Zeppelin to Solr via JDBC using Streaming connection.

    1. Environment Configuration

      Background info on setting up a SolrCloud cluster (required for Streaming) at https://github.com/docker-solr/docker-solr/blob/master/Docker-FAQ.md#can-i-run-zookeeper-and-solr-clusters-under-docker

    docker run --name zookeeper -d -p 2181:2181 -p 2888:2888 -p 3888:3888 jplock/zookeeper
    
    echo stat | nc 192.168.99.100 2181
    
    docker run --name solr1 --link zookeeper:ZK -d -p 8983:8983 \
          solr:6 \
          bash -c '/opt/solr/bin/solr start -f -z $ZK_PORT_2181_TCP_ADDR:$ZK_PORT_2181_TCP_PORT'
    
    docker run --name solr2 --link zookeeper:ZK -d -p 8984:8983 \
          solr:6 \
          bash -c '/opt/solr/bin/solr start -f -z $ZK_PORT_2181_TCP_ADDR:$ZK_PORT_2181_TCP_PORT'
    
    docker exec -i -t solr1 /opt/solr/bin/solr create_collection \
            -c techproducts -shards 2 -p 8983
    
    
    docker exec -it --user=solr solr1 bash -c "/opt/solr/bin/post -c techproducts /opt/solr/example/exampledocs/*.xml"
    
    curl --data-urlencode 'stmt=SELECT manu_id_s, count(1) FROM techproducts GROUP BY manu_id_s' http://192.168.99.100:8983/solr/techproducts/sql?aggregationMode=facet
    
    1. Set up the Solr JDBC connection in Zeppelin

      org.apache.solr:solr-solrj:6.0.0
      
      techproducts.driver=org.apache.solr.client.solrj.io.sql.DriverImpl
      techproducts.url=jdbc:solr://192.168.99.100:2181?collection=techproducts
      techproducts.user=
      techproducts.password=
      
    2. Make a Query

      %jdbc(techproducts) SELECT manu_id_s, count(*) FROM techproducts GROUP BY manu_id_s
      
    3. Play with the settings

      1. Change the sort orders
      2. flip the count and the manu_id_s in the keys
  2. Exercise 2: So Zeppelin is just a query UI??

    Walk through why Zepplin is more then just Tablau, but only if you use Spark!

    1. Environment Configuration

      We're using the LucidWorks Spark Solr integraiton https://github.com/lucidworks/spark-solr and walking through their NYC Yellow Cab example: https://github.com/lucidworks/spark-solr/blob/master/docs/examples/csv.adoc

    2. Setup the Solr Spark connection in Zeppelin Spark interpeter

      You can provide the Spark Solr jar via the same interpreter mechanism as you did the Solr JDBC library:

       ```		
       com.lucidworks.spark:spark-solr:2.0.1
       ```
      

      If you get an error, make sure to restart the Spark interpreter and then do the dependency load mechanism, adding dependencies has to happen before Spark jobs are run!

    3. Create Solr Core to put data into

      curl -X GET "http://192.168.99.100:8983/solr/admin/collections?action=create&name=test-spark-solr&collection.configName=techproducts&numShards=2&maxShardsPerNode=2"
      
    4. Grab data file onto Zeppelin. Yes, this should be a web request. Don't touch the local file system!

      docker exec -it zeppelin wget https://raw.githubusercontent.com/lucidworks/spark-solr/master/src/test/resources/test-data/nyc_yellow_taxi_sample_1k.csv
      
    5. Load data into Solr

      These next steps are directly copied from the Spark Solr example (thanks LucidWorks!) At some point this will be a notebook file that you reference at a URL and load directly into your Zeppelin!

      %spark
      val csvFileLocation = "/usr/zeppelin/nyc_yellow_taxi_sample_1k.csv"
      var csvDF = sqlContext.read.format("com.databricks.spark.csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load(csvFileLocation)
      
      // Filter out invalid lat/lon cols
      csvDF = csvDF.filter("pickup_latitude >= -90 AND pickup_latitude <= 90 AND pickup_longitude >= -180 AND pickup_longitude <= 180")
      csvDF = csvDF.filter("dropoff_latitude >= -90 AND dropoff_latitude <= 90 AND dropoff_longitude >= -180 AND dropoff_longitude <= 180")
      
      // concat the lat/lon cols into a single value expected by solr location fields
      csvDF = csvDF.withColumn("pickup", concat_ws(",", col("pickup_latitude"),col("pickup_longitude"))).drop("pickup_latitude").drop("pickup_longitude")
      csvDF = csvDF.withColumn("dropoff", concat_ws(",", col("dropoff_latitude"),col("dropoff_longitude"))).drop("dropoff_latitude").drop("dropoff_longitude")
      
      csvDF.registerTempTable("taxi")
      
      

      Let's make a query:

      %sql select  passenger_count,count(*)  from taxi group by passenger_count
      

      Now save the data to Solr

      %spark
      val options = Map(
      "zkhost" -> "192.168.99.100:2181",
      "collection" -> "test-spark-solr",
      "gen_uniq_key" -> "true" // Generate unique key if the 'id' field does not exist
      )
      
      // Write to Solr
      csvDF.write.format("solr").options(options).mode(org.apache.spark.sql.SaveMode.Overwrite).save
      

      Now make sure the data is committed (yes, this is awkward)!

      
      %spark
      val client = new org.apache.http.impl.client.DefaultHttpClient()
      val response = client.execute(new org.apache.http.client.methods.HttpGet("http://192.168.99.100:8983/solr/test-spark-solr/update?commit=true"))
      
    6. Query the data in Solr

      Okay, now you can query the data back:

      %spark
      val options = Map(
      "zkHost" -> "192.168.99.100:2181",
      "collection" -> "test-spark-solr"
      )
      
      val taxiDF = sqlContext.read.format("solr").options(options).load
      taxiDF.printSchema()
      

      This should print out the schema of the data loaded from Solr.

      Now load and cache the data (don't forget the cache or bad things happen!)

      %spark
      taxiDF.registerTempTable("trips")
      taxiDF.cache()
      

      And now query the dataframe using sql:

      %sql SELECT avg(tip_amount), avg(fare_amount) FROM trips
      %sql SELECT max(tip_amount), max(fare_amount) FROM trips WHERE trip_distance > 10
      
  3. Exercise 3: So what if I want to transform my data?

  4. We also need to tell the %spark interpreter where our ES nodes are by adding es.nodes =192.168.99.101 to the interpreter. Make sure to add org.elasticsearch:elasticsearch-hadoop:2.3.2 as well as a dependency.

  5. Now let's write it out to Elasticsearch. We can put it under any index and type we want!

      %spark
      EsSparkSQL.saveToEs(taxiDF,"taxi/rides")
    
  6. Great, and let's read it back in. We do need to update the %elasticsearch interpreter to point it at our ES node by updating elasticsearch.host=192.168.99.101 property. Let's query it via:

    ```
    %elasticsearch
    search /taxi {
    	"query": {
        	"match_all": {}
    	}
    }
    ```
    
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment