Skip to content

Instantly share code, notes, and snippets.

@zoltanctoth
Last active February 6, 2017 20:23
Show Gist options
  • Save zoltanctoth/87b2dab6212ca0154cb2116e6452564d to your computer and use it in GitHub Desktop.
Save zoltanctoth/87b2dab6212ca0154cb2116e6452564d to your computer and use it in GitHub Desktop.
How to use the Direct Kafka Source in Scala with offset Specification
import org.apache.spark._
import org.apache.spark.sql.Column
import org.apache.spark.streaming._
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SQLContext
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import com.github.benfradet.spark.kafka010.writer._
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.kafka.common.serialization.StringSerializer
object ReadFromKafka extends App {
val spark = SparkSession.builder
.master("local[4]")
.appName("ReadFromKafka")
.getOrCreate()
val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"key.serializer" -> classOf[StringSerializer].getName,
"value.serializer" -> classOf[StringSerializer].getName,
"group.id" -> "example",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("topic1")
val p = new TopicPartition("topic1", 0)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams, Map(p -> 0l))
)
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment