Create a gist now

Instantly share code, notes, and snippets.

Hortonworks User Group

These are notes for following along on the talk I am giving at http://www.meetup.com/Washington-DC-Hortonworks-User-Group-Meetup/events/230394067/

This builds on the gist: https://gist.github.com/epugh/5729071c3b8aab81636d422c391aa716, but is meant to be stand alone! 1 1. This gist is using not the latest version of Zeppelin, but the latest stable version. Replace the ip address 192.168.99.101 with the your docker machine ip. Get it by running docker-machine ip.

  1. Fire up Zeppelin + Spark Master and a Spark Worker via:

    docker run -d --name zeppelin -p 8080:8080 dylanmei/zeppelin:0.6.0-stable
    
  1. Exercise 1: Let's treat Solr as a relational database!

    Walk through connecting Zeppelin to Solr via JDBC using Streaming connection.

    1. Environment Configuration

      Background info on setting up a SolrCloud cluster (required for Streaming) at https://github.com/docker-solr/docker-solr/blob/master/Docker-FAQ.md#can-i-run-zookeeper-and-solr-clusters-under-docker

      docker run --name zookeeper -d -p 2181:2181 -p 2888:2888 -p 3888:3888 jplock/zookeeper
      
      echo stat | nc 192.168.99.101 2181
      
      docker run --name solr1 --link zookeeper:ZK -d -p 8983:8983 \
            solr:6 \
            bash -c '/opt/solr/bin/solr start -f -z $ZK_PORT_2181_TCP_ADDR:$ZK_PORT_2181_TCP_PORT'
      
      docker run --name solr2 --link zookeeper:ZK -d -p 8984:8983 \
            solr:6 \
            bash -c '/opt/solr/bin/solr start -f -z $ZK_PORT_2181_TCP_ADDR:$ZK_PORT_2181_TCP_PORT'
      
      docker exec -i -t solr1 /opt/solr/bin/solr create_collection \
              -c techproducts -shards 2 -p 8983
      
      
      docker exec -it --user=solr solr1 bash -c "/opt/solr/bin/post -c techproducts /opt/solr/example/exampledocs/*.xml"
      
      curl --data-urlencode 'stmt=SELECT manu_id_s, count(1) FROM techproducts GROUP BY manu_id_s' http://localhost:8983/solr/techproducts/sql?aggregationMode=facet
      
    2. Set up the Solr JDBC connection in Zeppelin. You need to grab internal IP address of Zookeeper node.

      docker exec -it --user solr solr1 bash
      echo $ZK_PORT_2181_TCP_ADDR
      
      org.apache.solr:solr-solrj:6.3.0
      
      techproducts.driver=org.apache.solr.client.solrj.io.sql.DriverImpl
      techproducts.url=jdbc:solr://172.17.0.3:2181?collection=techproducts
      techproducts.user=
      techproducts.password=
      
    3. Make a Query

      %jdbc(techproducts) SELECT manu_id_s, count(*) FROM techproducts GROUP BY manu_id_s
      
    4. Play with the settings

      1. Change the sort orders
      2. flip the count and the manu_id_s in the keys
    5. Exercise 2: So Zeppelin is just a query UI??

    Walk through why Zepplin is more then just Tablau, but only if you use Spark!

    1. Environment Configuration

      We're using the LucidWorks Spark Solr integration https://github.com/lucidworks/spark-solr and walking through their NYC Yellow Cab example: https://github.com/lucidworks/spark-solr/blob/master/docs/examples/csv.adoc

    2. Setup the Solr Spark connection in Zeppelin Spark interpeter

      You can provide the Spark Solr jar via the same interpreter mechanism as you did the Solr JDBC library:

      ```     
      com.lucidworks.spark:spark-solr:2.0.1
      ```
      

      If you get an error, make sure to restart the Spark interpreter and then do the dependency load mechanism, adding dependencies has to happen before Spark jobs are run!

      ```
      docker exec -it zeppelin wget https://repo1.maven.org/maven2/com/lucidworks/spark/spark-solr/2.0.1/spark-solr-2.0.1-shaded.jar
      
          # If you have a local copy then you can do this to upload it!:
      #docker cp /Users/epugh/Documents/projects/zeppelin/spark-solr-2.0.1-shaded.jar zeppelin:/zeppelin/
      
      %dep
      z.reset()
      z.addRepo("Spark Packages Repo").url("http://dl.bintray.com/spark-packages/maven")
      z.load("com.databricks:spark-csv_2.10:1.4.0")
      z.load("/zeppelin/spark-solr-2.0.1-shaded.jar")
      
          ```
      
    3. Create Solr Core to put data into

      curl -X GET "http://192.168.99.101:8983/solr/admin/collections?action=create&name=test-spark-solr&collection.configName=techproducts&numShards=2&maxShardsPerNode=2"
      
    4. Grab data file onto Zeppelin. Yes, this should be a web request. Don't touch the local file system!

      docker exec -it zeppelin wget https://raw.githubusercontent.com/lucidworks/spark-solr/master/src/test/resources/test-data/nyc_yellow_taxi_sample_1k.csv
      
    5. Load data into Solr

      These next steps are directly copied from the Spark Solr example (thanks LucidWorks!) At some point this will be a notebook file that you reference at a URL and load directly into your Zeppelin!

      %spark
      val csvFileLocation = "/zeppelin/nyc_yellow_taxi_sample_1k.csv"
      var csvDF = sqlContext.read.format("com.databricks.spark.csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load(csvFileLocation)
      
      // Filter out invalid lat/lon cols
      csvDF = csvDF.filter("pickup_latitude >= -90 AND pickup_latitude <= 90 AND pickup_longitude >= -180 AND pickup_longitude <= 180")
      csvDF = csvDF.filter("dropoff_latitude >= -90 AND dropoff_latitude <= 90 AND dropoff_longitude >= -180 AND dropoff_longitude <= 180")
      
      // concat the lat/lon cols into a single value expected by solr location fields
      csvDF = csvDF.withColumn("pickup", concat_ws(",", col("pickup_latitude"),col("pickup_longitude"))).drop("pickup_latitude").drop("pickup_longitude")
      csvDF = csvDF.withColumn("dropoff", concat_ws(",", col("dropoff_latitude"),col("dropoff_longitude"))).drop("dropoff_latitude").drop("dropoff_longitude")
      
      csvDF.registerTempTable("taxi")
      
      

      Let's make a query:

      %sql select  passenger_count,count(*)  from taxi group by passenger_count
      

      Now save the data to Solr

      %spark
      val options = Map(
      "zkhost" -> "192.168.99.101:2181",
      "collection" -> "test-spark-solr",
      "gen_uniq_key" -> "true" // Generate unique key if the 'id' field does not exist
      )
      
      // Write to Solr
      csvDF.write.format("solr").options(options).mode(org.apache.spark.sql.SaveMode.Overwrite).save
      

      Now make sure the data is committed (yes, this is awkward)!

      
      %spark
      val client = new org.apache.http.impl.client.DefaultHttpClient()
      val response = client.execute(new org.apache.http.client.methods.HttpGet("http://192.168.99.101:8983/solr/test-spark-solr/update?commit=true"))
      
    6. Query the data in Solr

      Okay, now you can query the data back:

      %spark
      val options = Map(
      "zkHost" -> "192.168.99.101:2181",
      "collection" -> "test-spark-solr"
      )
      
      val taxiDF = sqlContext.read.format("solr").options(options).load
      taxiDF.printSchema()
      

      This should print out the schema of the data loaded from Solr.

      Now load and cache the data (don't forget the cache or bad things happen!)

      %spark
      taxiDF.registerTempTable("trips")
      taxiDF.cache()
      

      And now query the dataframe using sql:

      %sql SELECT avg(tip_amount), avg(fare_amount) FROM trips
      %sql SELECT max(tip_amount), max(fare_amount) FROM trips WHERE trip_distance > 10
      
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment