Skip to content

Instantly share code, notes, and snippets.

@hardkap
Created January 13, 2017 06:01
Show Gist options
  • Save hardkap/60152ebb8514e7d3c9995587b523fedb to your computer and use it in GitHub Desktop.
Save hardkap/60152ebb8514e7d3c9995587b523fedb to your computer and use it in GitHub Desktop.
name := "kafkatest"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.0.1" % "provided",
"org.apache.spark" %% "spark-streaming" % "2.0.1" % "provided",
"org.apache.spark" %% "spark-sql" % "2.0.1",
"org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.0.1"
)
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.log4j.Logger
import org.apache.log4j.Level
/** KafkaThree - Spark Streaming App to find the bigrams from Log data coming through Kafka broker */
object KafkaThree {
def main(args: Array[String]): Unit = {
// Suppress some of the log messages for seeing test results easily
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
// Setting up the spark streaming context
val sparkConf = new SparkConf().setAppName("KafkaThree")
sparkConf.set("spark.streaming.stopGracefullyOnShutdown","true")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(1))
// Broadcasting the solr data
val bcSolr = sc.broadcast(Array("hello there", "there goes", "howdy buddy"))
// Setting up Kafka streaming receiver
val topicMap = "test".split(",").map((_, 1)).toMap //hardcoding the kafka topic for testing
val lines = KafkaUtils.createStream(ssc, "localhost:2182", "group1", topicMap).map(_._2)
// Looping through the data received from the input stream
lines.foreachRDD(rdd => if(!rdd.partitions.isEmpty) {
val nrdd = rdd.map{
// Split each line into substrings by periods
_.split('.').map{ substrings =>
// Trim substrings and then tokenize on spaces
substrings.trim.split(' ').
// Remove non-alphanumeric characters and convert to lowercase
map{_.replaceAll("""\W""", "").toLowerCase()}.
// Find bigrams
sliding(2)
}.
// Flatten, and map the bigrams to concatenated strings
flatMap{identity}.map{_.mkString(" ")}.
// Group the bigrams and count their frequency
groupBy{identity}.mapValues{_.size}
}.
// Reduce to get a global count, then collect
flatMap{identity}.reduceByKey(_+_)
// Filter by what matches the Solr data and print the rest
nrdd.filter(x => !(bcSolr.value contains x._1)).foreach(println)
})
// Start the streaming context
ssc.start()
ssc.awaitTermination()
}
}
/**
To Run a test:
==============
1 - First compile the program using (build.sbt is also given in this gist)
sbt package
2 - In the shell of the current folder, run:
spark-submit --class KafkaThree --jars library/spark-streaming-kafka-0-8-assembly_2.11-2.0.1.jar target/scala-2.11/kafkatest_2.11-1.0.jar
3 - In another shell, run the following (assuming KAFKA is installed and $KAFKA_HOME is set and a topic 'test' is already created)
$KAFKA_HOME/bin/kafka-console-producer.sh --topic=test --broker-list=localhost:32770,localhost:32771
4 - In the Kafka producer prompt, type text like below:
hello there goes howdy buddy
hello there goes howdy buddy goes howdy
5 - View the outputs in the first shell where the Spark streaming application is running
Notes:
======
The original requirement was to take bcSolr data by connecting to Solr.
But, this program is using a plain broadcast variable instead of taking the data from Solr.
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment