Skip to content

Instantly share code, notes, and snippets.

@HeartSaVioR
Created October 15, 2018 06:52
Show Gist options
  • Save HeartSaVioR/6afe09818f9b24ec19341c8d44abf98a to your computer and use it in GitHub Desktop.
Save HeartSaVioR/6afe09818f9b24ec19341c8d44abf98a to your computer and use it in GitHub Desktop.
Simple Spark DStream with Kafka
package net.heartsavior.spark.trial
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
object SparkDStreamTrial {
val checkpointDirectory = "./spark-dstream-trial"
// Function to create and setup a new StreamingContext
def functionToCreateContext(conf: SparkConf): StreamingContext = {
val ssc = new StreamingContext(conf, Seconds(10))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark-dstream-trial",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
conf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true")
val topics = Array("truck_speed_events_stream")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(_.value()).foreachRDD { rdd =>
// Get the singleton instance of SparkSession
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
// Convert RDD[String] to DataFrame
val dataFrame = spark.sqlContext.read.json(rdd)
dataFrame.show()
}
ssc
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkDStreamTrial")
// Get StreamingContext from checkpoint data or create a new one
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => functionToCreateContext(conf))
ssc.start()
ssc.awaitTermination()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment