Skip to content

Instantly share code, notes, and snippets.

@hardkap
Created January 17, 2017 06:15
Show Gist options
  • Save hardkap/03d0a2befe77fd1a30d91b8e1f4e4319 to your computer and use it in GitHub Desktop.
Save hardkap/03d0a2befe77fd1a30d91b8e1f4e4319 to your computer and use it in GitHub Desktop.
Spark Streaming with Kafka, Solr and Indexed RDD.
name := "kafkatest"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.0.1" % "provided",
"org.apache.spark" %% "spark-streaming" % "2.0.1" % "provided",
"org.apache.spark" %% "spark-sql" % "2.0.1",
"org.apache.spark" %% "spark-mllib" % "2.0.1",
"org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.0.1",
"org.apache.spark" %% "spark-mllib" % "2.0.1"
)
// META-INF discarding
mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
{
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD
import edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD._
import org.apache.log4j.Logger
import org.apache.log4j.Level
/** KafkaIndexed - Find the bigrams from Log data coming through Kafka broker */
object KafkaIndexed {
def main(args: Array[String]): Unit = {
// Suppress some of the log messages for seeing test results easily
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
// Setting up the spark streaming context
val sparkConf = new SparkConf().setAppName("KafkaIndexed").setMaster("local[*]")
sparkConf.set("spark.streaming.stopGracefullyOnShutdown","true")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val ssc = new StreamingContext(sc, Seconds(1))
// Read the solr data and cache it as Indexed RDD (https://github.com/amplab/spark-indexedrdd)
val options = Map("zkHost" -> "localhost:2181","collection" -> "biwords","fields" -> "biwords,id")
val rdd_biwords = sqlContext.read.format("solr").options(options).load().rdd.map(r => (r.getString(0),r.getString(1)))
val indexed = IndexedRDD(rdd_biwords).cache()
// Set up Kafka streaming receiver
val topicMap = "test".split(",").map((_, 1)).toMap //hardcoding the kafka topic for testing
val lines = KafkaUtils.createStream(ssc, "localhost:2182", "group1", topicMap).map(_._2)
// Looping through the data received from the input stream
lines.foreachRDD(rdd => if(!rdd.partitions.isEmpty) {
val nrdd = rdd.map{
// Split each line into substrings by periods
_.split('.').map{ substrings =>
// Trim substrings and then tokenize on spaces
substrings.trim.split(' ').
// Remove non-alphanumeric characters and convert to lowercase
map{_.replaceAll("""\W""", "").toLowerCase()}.
// Find bigrams
sliding(2)
}.
// Flatten, and map the bigrams to concatenated strings
flatMap{identity}.map{_.mkString(" ")}.
// Group the bigrams and count their frequency
groupBy{identity}.mapValues{_.size}
}.
// Reduce to get a global count, then collect
flatMap{identity}.reduceByKey(_+_)
nrdd.join(indexed).map{ case (a,(b,c)) => (a,b)}.foreach(println)
})
// Start the streaming context
ssc.start()
ssc.awaitTermination()
}
}
/**
To Run a test:
==============
1 - Download and install dependencies:
a) Download the following jar
https://github.com/ankurdave/maven-repo/blob/master/com/ankurdave/part_2.10/0.2/part_2.10-0.2.jar
b) Download the IndexedRdd jar
http://dl.bintray.com/spark-packages/maven/amplab/spark-indexedrdd/0.4.0/spark-indexedrdd-0.4.0.jar
c) Download and build spark solr to create the jar (due to dependency issues)
https://github.com/lucidworks/spark-solr
2 - Build the program using the build.sbt provided
sbt package
3 - In the shell, run:
spark-submit --class KafkaIndexed \
--jars library/spark-solr-3.0.1-SNAPSHOT-shaded.jar, \
library/amplab_spark-indexedrdd-0.4.0.jar, \
library/spark-streaming-kafka-0-8-assembly_2.11-2.0.1.jar, \
library/part_2.10-0.2.jar \
target/scala-2.11/kafkatest_2.11-1.0.jar
4 - In another shell, run the following (assuming KAFKA is installed and $KAFKA_HOME is set and a topic 'test' is already created)
$KAFKA_HOME/bin/kafka-console-producer.sh --topic=test --broker-list=172.17.7.7:32771,172.17.7.7:32772
5 - In the Kafka producer prompt, type text like below:
hello there how are you doing today
write something that does not match any biwords in the solr db
6 - View the outputs in the first shell where the Spark streaming application is running
Assumptions:
============
1 - Solr cloud is setup and running connected to Zookeeper at port 2181
2 - Solr has a collection called 'biwords' with 2 fields (id and biwords).
3 - In the Solr collection, 'Multivalued' should be set to false for the biwords field.
4 - Kafka nodes are running connected to another Zookeeper at port 2182
5 - Kafka topic 'test' is created
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment