Skip to content

Instantly share code, notes, and snippets.

@epugh
Last active October 9, 2018 03:30
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save epugh/5729071c3b8aab81636d422c391aa716 to your computer and use it in GitHub Desktop.
Save epugh/5729071c3b8aab81636d422c391aa716 to your computer and use it in GitHub Desktop.
Steps for following along with Eric's Zeppelin talk.

The below steps all assume you have installed Docker. I used the Kitematic tool for OSX, and it worked great. Everything is mapped to your "localhost" domain name.

  1. Let's Set up Zeppelin

    I am using this Docker image https://github.com/dylanmei/docker-zeppelin to fire up Zeppelin and Spark. Note, it's slow cause there is so many processes (Spark Master, Spark Worker, Zeppelin) to start! This is now up to Zeppelin 0.7.0

    docker run -d --name zeppelin -p 8080:8080 dylanmei/zeppelin
    
  2. Exercise 1: It's like Tableau, only cheaper!

    Making a JDBC Query to MySQL from Zeppelin. I was using the database behind Quepid.com, which I had a dump. You will need to substitute your own MySQL database!

    1. Environment Configuration

      • Use Sequel to create a empty database called Quepid
      • Use Sequel to suck in the Quepid data file.
    2. Set up the MySQL JDBC connection in Zeppelin. Edit the jdbc interpreter and add the Mysql jar from Maven repo, and the settings.

      mysql:mysql-connector-java:5.1.38
      
      quepid-db.driver=com.mysql.jdbc.Driver
      quepid-db.url=jdbc:mysql://192.168.99.100:3306/quepid
      quepid-db.user=root
      quepid-db.password=password
      
    3. Make a Query

      %jdbc(quepid-db) SELECT c.caseName, count(1) as count_queries from `case` c, query q where c.id = q.case_id group by c.id
      
    4. Play with the settings

      1. Try out the sorting features built in.
      2. Try out the counts, moving them around. Do count_queries for key and caseName COUNT for values
  3. Exercise 2: Now lets jump up to querying something more interesting, Solr!

    Walk through connecting Zeppelin to Solr via JDBC using Streaming connection.

    1. Environment Configuration

      Background info on setting up a SolrCloud cluster (required for Streaming) at https://github.com/docker-solr/docker-solr/blob/master/Docker-FAQ.md#can-i-run-zookeeper-and-solr-clusters-under-docker

    docker run --name zookeeper -d -p 2181:2181 -p 2888:2888 -p 3888:3888 jplock/zookeeper
    
    echo stat | nc 192.168.99.100 2181
    
    docker run --name solr1 --link zookeeper:ZK -d -p 8983:8983 \
          solr:6 \
          bash -c '/opt/solr/bin/solr start -f -z $ZK_PORT_2181_TCP_ADDR:$ZK_PORT_2181_TCP_PORT'
    
    docker run --name solr2 --link zookeeper:ZK -d -p 8984:8983 \
          solr:6 \
          bash -c '/opt/solr/bin/solr start -f -z $ZK_PORT_2181_TCP_ADDR:$ZK_PORT_2181_TCP_PORT'
    
    docker exec -i -t solr1 /opt/solr/bin/solr create_collection \
            -c techproducts -shards 2 -p 8983
    
    
    docker exec -it --user=solr solr1 bash -c "/opt/solr/bin/post -c techproducts /opt/solr/example/exampledocs/*.xml"
    
    curl --data-urlencode 'stmt=SELECT manu_id_s, count(1) FROM techproducts GROUP BY manu_id_s' http://192.168.99.100:8983/solr/techproducts/sql?aggregationMode=facet
    
    1. Set up the Solr JDBC connection in Zeppelin. You need to grab internal IP address of Zookeeper node.

    docker exec -it --user solr solr1 bash echo $ZK_PORT_2181_TCP_ADDR

    
        ```
        org.apache.solr:solr-solrj:6.3.0
    
        techproducts.driver=org.apache.solr.client.solrj.io.sql.DriverImpl
        techproducts.url=jdbc:solr://$ZK_PORT_2181_TCP_ADDR:2181?collection=techproducts
        techproducts.user=
        techproducts.password=
        ```
    
    1. Make a Query
    
        ```
        %jdbc(techproducts) SELECT manu_id_s, count(*) FROM techproducts GROUP BY manu_id_s
        ```   
        
    1. Play with the settings
        1. Change the sort orders
        1. grab bar chart and open settings.   Make count be keys, manu_id_s be Groups, and count SUM in values.
    
    
  4. Exercise 3: So Zeppelin is just a query UI??

    Walk through why Zepplin is more then just Tablau, but only if you use Spark!

    1. Environment Configuration

      We're using the LucidWorks Spark Solr integraiton https://github.com/lucidworks/spark-solr and walking through their NYC Yellow Cab example: https://github.com/lucidworks/spark-solr/blob/master/docs/examples/csv.adoc

    2. Setup the Solr Spark connection in Zeppelin Spark interpeter

      You can provide the Spark Solr jar via the same interpreter mechanism as you did the Solr JDBC library:

       ```		
       com.lucidworks.spark:spark-solr:2.0.1
       ```
      

      If you get an error, make sure to restart the Spark interpreter and then do the dependency load mechanism, adding dependencies has to happen before Spark jobs are run!

    3. Create Solr Core to put data into

      curl -X GET "http://192.168.99.100:8983/solr/admin/collections?action=create&name=test-spark-solr&collection.configName=techproducts&numShards=2&maxShardsPerNode=2"
      
    4. Grab data file onto Zeppelin. Yes, this should be a web request. Don't touch the local file system!

      docker exec -it zeppelin wget https://raw.githubusercontent.com/lucidworks/spark-solr/master/src/test/resources/test-data/nyc_yellow_taxi_sample_1k.csv
      
    5. Load data into Solr

      These next steps are directly copied from the Spark Solr example (thanks LucidWorks!) At some point this will be a notebook file that you reference at a URL and load directly into your Zeppelin!

      %spark
      val csvFileLocation = "/usr/zeppelin/nyc_yellow_taxi_sample_1k.csv"
      var csvDF = sqlContext.read.format("com.databricks.spark.csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load(csvFileLocation)
      
      // Filter out invalid lat/lon cols
      csvDF = csvDF.filter("pickup_latitude >= -90 AND pickup_latitude <= 90 AND pickup_longitude >= -180 AND pickup_longitude <= 180")
      csvDF = csvDF.filter("dropoff_latitude >= -90 AND dropoff_latitude <= 90 AND dropoff_longitude >= -180 AND dropoff_longitude <= 180")
      
      // concat the lat/lon cols into a single value expected by solr location fields
      csvDF = csvDF.withColumn("pickup", concat_ws(",", col("pickup_latitude"),col("pickup_longitude"))).drop("pickup_latitude").drop("pickup_longitude")
      csvDF = csvDF.withColumn("dropoff", concat_ws(",", col("dropoff_latitude"),col("dropoff_longitude"))).drop("dropoff_latitude").drop("dropoff_longitude")
      
      csvDF.registerTempTable("taxi")
      
      

      Let's make a query:

      %sql select  passenger_count,count(*)  from taxi group by passenger_count
      

      Now save the data to Solr

      %spark
      val options = Map(
      "zkhost" -> "192.168.99.100:2181",
      "collection" -> "test-spark-solr",
      "gen_uniq_key" -> "true" // Generate unique key if the 'id' field does not exist
      )
      
      // Write to Solr
      csvDF.write.format("solr").options(options).mode(org.apache.spark.sql.SaveMode.Overwrite).save
      

      Now make sure the data is committed (yes, this is awkward)!

      
      %spark
      val client = new org.apache.http.impl.client.DefaultHttpClient()
      val response = client.execute(new org.apache.http.client.methods.HttpGet("http://192.168.99.100:8983/solr/test-spark-solr/update?commit=true"))
      
    6. Query the data in Solr

      Okay, now you can query the data back:

      %spark
      val options = Map(
      "zkHost" -> "192.168.99.100:2181",
      "collection" -> "test-spark-solr"
      )
      
      val df = sqlContext.read.format("solr").options(options).load
      df.printSchema()
      

      This should print out the schema of the data loaded from Solr.

      Now load and cache the data (don't forget the cache or bad things happen!)

      %spark
      df.registerTempTable("trips")
      df.cache()
      

      And now query the dataframe using sql:

      %sql SELECT avg(tip_amount), avg(fare_amount) FROM trips
      %sql SELECT max(tip_amount), max(fare_amount) FROM trips WHERE trip_distance > 10
      
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment