These are notes for following along on the talk I am giving at http://www.meetup.com/Washington-DC-Hortonworks-User-Group-Meetup/events/230394067/
This builds on the gist: https://gist.github.com/epugh/5729071c3b8aab81636d422c391aa716, but is meant to be stand alone! 1
- This gist is using not the latest version of Zeppelin, but the latest stable version. Replace the ip address
192.168.99.101
with the your docker machine ip. Get it by runningdocker-machine ip
. - Fire up Zeppelin + Spark Master and a Spark Worker via:
docker run -d --name zeppelin -p 8080:8080 dylanmei/zeppelin:0.6.0-stable
-
Exercise 1: Let's treat Solr as a relational database!
Walk through connecting Zeppelin to Solr via JDBC using Streaming connection.
-
Environment Configuration
Background info on setting up a SolrCloud cluster (required for Streaming) at https://github.com/docker-solr/docker-solr/blob/master/Docker-FAQ.md#can-i-run-zookeeper-and-solr-clusters-under-docker
docker run --name zookeeper -d -p 2181:2181 -p 2888:2888 -p 3888:3888 jplock/zookeeper echo stat | nc 192.168.99.101 2181 docker run --name solr1 --link zookeeper:ZK -d -p 8983:8983 \ solr:6 \ bash -c '/opt/solr/bin/solr start -f -z $ZK_PORT_2181_TCP_ADDR:$ZK_PORT_2181_TCP_PORT' docker run --name solr2 --link zookeeper:ZK -d -p 8984:8983 \ solr:6 \ bash -c '/opt/solr/bin/solr start -f -z $ZK_PORT_2181_TCP_ADDR:$ZK_PORT_2181_TCP_PORT' docker exec -i -t solr1 /opt/solr/bin/solr create_collection \ -c techproducts -shards 2 -p 8983
docker exec -it --user=solr solr1 bash -c "/opt/solr/bin/post -c techproducts /opt/solr/example/exampledocs/*.xml"
curl --data-urlencode 'stmt=SELECT manu_id_s, count(1) FROM techproducts GROUP BY manu_id_s' http://localhost:8983/solr/techproducts/sql?aggregationMode=facet
-
Set up the Solr JDBC connection in Zeppelin. You need to grab internal IP address of Zookeeper node.
docker exec -it --user solr solr1 bash echo $ZK_PORT_2181_TCP_ADDR
``` org.apache.solr:solr-solrj:6.3.0 techproducts.driver=org.apache.solr.client.solrj.io.sql.DriverImpl techproducts.url=jdbc:solr://172.17.0.3:2181?collection=techproducts techproducts.user= techproducts.password= ``` 1. Make a Query ``` %jdbc(techproducts) SELECT manu_id_s, count(*) FROM techproducts GROUP BY manu_id_s ``` 1. Play with the settings 1. Change the sort orders 1. flip the count and the `manu_id_s` in the keys
-
-
Exercise 2: So Zeppelin is just a query UI??
Walk through why Zepplin is more then just Tablau, but only if you use Spark!
-
Environment Configuration
We're using the LucidWorks Spark Solr integration https://github.com/lucidworks/spark-solr and walking through their NYC Yellow Cab example: https://github.com/lucidworks/spark-solr/blob/master/docs/examples/csv.adoc
-
Setup the Solr Spark connection in Zeppelin Spark interpeter
You can provide the Spark Solr jar via the same interpreter mechanism as you did the Solr JDBC library:
``` com.lucidworks.spark:spark-solr:2.0.1 ```
If you get an error, make sure to restart the Spark interpreter and then do the dependency load mechanism, adding dependencies has to happen before Spark jobs are run!
``` docker exec -it zeppelin wget https://repo1.maven.org/maven2/com/lucidworks/spark/spark-solr/2.0.1/spark-solr-2.0.1-shaded.jar # If you have a local copy then you can do this to upload it!: #docker cp /Users/epugh/Documents/projects/zeppelin/spark-solr-2.0.1-shaded.jar zeppelin:/zeppelin/ %dep z.reset() z.addRepo("Spark Packages Repo").url("http://dl.bintray.com/spark-packages/maven") z.load("com.databricks:spark-csv_2.10:1.4.0") z.load("/zeppelin/spark-solr-2.0.1-shaded.jar") ```
-
Create Solr Core to put data into
curl -X GET "http://192.168.99.101:8983/solr/admin/collections?action=create&name=test-spark-solr&collection.configName=techproducts&numShards=2&maxShardsPerNode=2"
-
Grab data file onto Zeppelin. Yes, this should be a web request. Don't touch the local file system!
docker exec -it zeppelin wget https://raw.githubusercontent.com/lucidworks/spark-solr/master/src/test/resources/test-data/nyc_yellow_taxi_sample_1k.csv
-
Load data into Solr
These next steps are directly copied from the Spark Solr example (thanks LucidWorks!) At some point this will be a notebook file that you reference at a URL and load directly into your Zeppelin!
%spark val csvFileLocation = "/zeppelin/nyc_yellow_taxi_sample_1k.csv" var csvDF = sqlContext.read.format("com.databricks.spark.csv") .option("header", "true") .option("inferSchema", "true") .load(csvFileLocation) // Filter out invalid lat/lon cols csvDF = csvDF.filter("pickup_latitude >= -90 AND pickup_latitude <= 90 AND pickup_longitude >= -180 AND pickup_longitude <= 180") csvDF = csvDF.filter("dropoff_latitude >= -90 AND dropoff_latitude <= 90 AND dropoff_longitude >= -180 AND dropoff_longitude <= 180") // concat the lat/lon cols into a single value expected by solr location fields csvDF = csvDF.withColumn("pickup", concat_ws(",", col("pickup_latitude"),col("pickup_longitude"))).drop("pickup_latitude").drop("pickup_longitude") csvDF = csvDF.withColumn("dropoff", concat_ws(",", col("dropoff_latitude"),col("dropoff_longitude"))).drop("dropoff_latitude").drop("dropoff_longitude") csvDF.registerTempTable("taxi")
Let's make a query:
%sql select passenger_count,count(*) from taxi group by passenger_count
Now save the data to Solr
%spark val options = Map( "zkhost" -> "192.168.99.101:2181", "collection" -> "test-spark-solr", "gen_uniq_key" -> "true" // Generate unique key if the 'id' field does not exist ) // Write to Solr csvDF.write.format("solr").options(options).mode(org.apache.spark.sql.SaveMode.Overwrite).save
Now make sure the data is committed (yes, this is awkward)!
%spark val client = new org.apache.http.impl.client.DefaultHttpClient() val response = client.execute(new org.apache.http.client.methods.HttpGet("http://192.168.99.101:8983/solr/test-spark-solr/update?commit=true"))
-
Query the data in Solr
Okay, now you can query the data back:
%spark val options = Map( "zkHost" -> "192.168.99.101:2181", "collection" -> "test-spark-solr" ) val taxiDF = sqlContext.read.format("solr").options(options).load taxiDF.printSchema()
This should print out the schema of the data loaded from Solr.
Now load and cache the data (don't forget the cache or bad things happen!)
%spark taxiDF.registerTempTable("trips") taxiDF.cache()
And now query the dataframe using sql:
%sql SELECT avg(tip_amount), avg(fare_amount) FROM trips %sql SELECT max(tip_amount), max(fare_amount) FROM trips WHERE trip_distance > 10
-