Created January 17, 2017 06:15
Spark Streaming with Kafka, Solr and Indexed RDD.
name := "kafkatest"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.0.1" % "provided",
"org.apache.spark" %% "spark-streaming" % "2.0.1" % "provided",
"org.apache.spark" %% "spark-sql" % "2.0.1",
"org.apache.spark" %% "spark-mllib" % "2.0.1",
"org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.0.1",
"org.apache.spark" %% "spark-mllib" % "2.0.1"
// META-INF discarding
mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD
import edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD._
import org.apache.log4j.Logger
import org.apache.log4j.Level
/** KafkaIndexed - Find the bigrams from Log data coming through Kafka broker */
object KafkaIndexed {
def main(args: Array[String]): Unit = {
// Suppress some of the log messages for seeing test results easily
// Setting up the spark streaming context
val sparkConf = new SparkConf().setAppName("KafkaIndexed").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val ssc = new StreamingContext(sc, Seconds(1))
// Read the solr data and cache it as Indexed RDD (
val options = Map("zkHost" -> "localhost:2181","collection" -> "biwords","fields" -> "biwords,id")
val rdd_biwords ="solr").options(options).load() => (r.getString(0),r.getString(1)))
val indexed = IndexedRDD(rdd_biwords).cache()
// Set up Kafka streaming receiver
val topicMap = "test".split(",").map((_, 1)).toMap //hardcoding the kafka topic for testing
val lines = KafkaUtils.createStream(ssc, "localhost:2182", "group1", topicMap).map(_._2)
// Looping through the data received from the input stream
lines.foreachRDD(rdd => if(!rdd.partitions.isEmpty) {
val nrdd ={
// Split each line into substrings by periods
_.split('.').map{ substrings =>
// Trim substrings and then tokenize on spaces
substrings.trim.split(' ').
// Remove non-alphanumeric characters and convert to lowercase
map{_.replaceAll("""\W""", "").toLowerCase()}.
// Find bigrams
// Flatten, and map the bigrams to concatenated strings
flatMap{identity}.map{_.mkString(" ")}.
// Group the bigrams and count their frequency
// Reduce to get a global count, then collect
nrdd.join(indexed).map{ case (a,(b,c)) => (a,b)}.foreach(println)
// Start the streaming context
To Run a test:
1 - Download and install dependencies:
a) Download the following jar
b) Download the IndexedRdd jar
c) Download and build spark solr to create the jar (due to dependency issues)
2 - Build the program using the build.sbt provided
sbt package
3 - In the shell, run:
spark-submit --class KafkaIndexed \
--jars library/spark-solr-3.0.1-SNAPSHOT-shaded.jar, \
library/amplab_spark-indexedrdd-0.4.0.jar, \
library/spark-streaming-kafka-0-8-assembly_2.11-2.0.1.jar, \
library/part_2.10-0.2.jar \
4 - In another shell, run the following (assuming KAFKA is installed and $KAFKA_HOME is set and a topic 'test' is already created)
$KAFKA_HOME/bin/ --topic=test --broker-list=,
5 - In the Kafka producer prompt, type text like below:
hello there how are you doing today
write something that does not match any biwords in the solr db
6 - View the outputs in the first shell where the Spark streaming application is running
1 - Solr cloud is setup and running connected to Zookeeper at port 2181
2 - Solr has a collection called 'biwords' with 2 fields (id and biwords).
3 - In the Solr collection, 'Multivalued' should be set to false for the biwords field.
4 - Kafka nodes are running connected to another Zookeeper at port 2182
5 - Kafka topic 'test' is created
