Created January 13, 2017 06:01
name := "kafkatest"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.0.1" % "provided",
"org.apache.spark" %% "spark-streaming" % "2.0.1" % "provided",
"org.apache.spark" %% "spark-sql" % "2.0.1",
"org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.0.1"
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.log4j.Logger
import org.apache.log4j.Level
/** KafkaThree - Spark Streaming App to find the bigrams from Log data coming through Kafka broker */
object KafkaThree {
def main(args: Array[String]): Unit = {
// Suppress some of the log messages for seeing test results easily
// Setting up the spark streaming context
val sparkConf = new SparkConf().setAppName("KafkaThree")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(1))
// Broadcasting the solr data
val bcSolr = sc.broadcast(Array("hello there", "there goes", "howdy buddy"))
// Setting up Kafka streaming receiver
val topicMap = "test".split(",").map((_, 1)).toMap //hardcoding the kafka topic for testing
val lines = KafkaUtils.createStream(ssc, "localhost:2182", "group1", topicMap).map(_._2)
// Looping through the data received from the input stream
lines.foreachRDD(rdd => if(!rdd.partitions.isEmpty) {
val nrdd ={
// Split each line into substrings by periods
_.split('.').map{ substrings =>
// Trim substrings and then tokenize on spaces
substrings.trim.split(' ').
// Remove non-alphanumeric characters and convert to lowercase
map{_.replaceAll("""\W""", "").toLowerCase()}.
// Find bigrams
// Flatten, and map the bigrams to concatenated strings
flatMap{identity}.map{_.mkString(" ")}.
// Group the bigrams and count their frequency
// Reduce to get a global count, then collect
// Filter by what matches the Solr data and print the rest
nrdd.filter(x => !(bcSolr.value contains x._1)).foreach(println)
// Start the streaming context
To Run a test:
1 - First compile the program using (build.sbt is also given in this gist)
sbt package
2 - In the shell of the current folder, run:
spark-submit --class KafkaThree --jars library/spark-streaming-kafka-0-8-assembly_2.11-2.0.1.jar target/scala-2.11/kafkatest_2.11-1.0.jar
3 - In another shell, run the following (assuming KAFKA is installed and $KAFKA_HOME is set and a topic 'test' is already created)
$KAFKA_HOME/bin/ --topic=test --broker-list=localhost:32770,localhost:32771
4 - In the Kafka producer prompt, type text like below:
hello there goes howdy buddy
hello there goes howdy buddy goes howdy
5 - View the outputs in the first shell where the Spark streaming application is running
The original requirement was to take bcSolr data by connecting to Solr.
But, this program is using a plain broadcast variable instead of taking the data from Solr.
