Skip to content

Instantly share code, notes, and snippets.

@timvw
Last active September 28, 2022 23:36
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save timvw/ddb5dfd470eb06786209e218fce5e190 to your computer and use it in GitHub Desktop.
Save timvw/ddb5dfd470eb06786209e218fce5e190 to your computer and use it in GitHub Desktop.
Spark structured streaming: Commit source offsets to Kafka on QueryProgress
val groupId = "demo-consumer"
val bootstrapServers = "localhost:9092"
val spark = SparkSession
.builder()
.master("local[*]")
.appName("demo")
//.config("spark.sql.streaming.stateStore.maintenanceInterval", "10s")
//.config("spark.sql.streaming.metricsEnabled", true)
.config("spark.sql.streaming.minBatchesToRetain", 2)
.config("spark.sql.streaming.stateStore.minDeltasForSnapshot", 2)
.getOrCreate()
import spark.implicits._
val props = new Properties()
props.put("group.id", groupId)
props.put("bootstrap.servers", bootstrapServers)
props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
props.put("enable.auto.commit", "false")
val kafkaConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](props)
val listener = new CommitOffsetsOnProgressQueryListener(kafkaConsumer)
spark.streams.addListener(listener)
case class CommitOffsetsOnProgressQueryListener(kafkaConsumer: KafkaConsumer[_, _]) extends StreamingQueryListener {
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { }
override def onQueryProgress(event: QueryProgressEvent): Unit = {
val offsets = event
.getSourceOffsets()
.mapValues(new OffsetAndMetadata(_))
import scala.collection.JavaConversions._
kafkaConsumer.commitSync(offsets)
}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { }
}
implicit class QueryProgressEventExt(event: QueryProgressEvent) {
def getSourceOffsets(): Map[TopicPartition, Long] = event.progress
.sources // assumption: sources are all kafka, all on the same cluster
.map(_.endOffset)
.flatMap(JsonUtilsWrapper.jsonToOffsets) // extract all offsets
.groupBy(_._1) // take the smallest offset per topic partition
.mapValues(_.sortWith((a, b) => a._2.compareTo(b._2) < 0).head._2)
}
@asafblv
Copy link

asafblv commented Oct 6, 2020

Hi,
Is it working for you?

Thanks

@timvw
Copy link
Author

timvw commented Oct 7, 2020

It once worked for me using spark 2.3 ..

The clue was in making sure the "separate" (different group-id) kafka consumer instance was subscribed to the topic(s).. otherwise the broker would not accept the commits..

@Rahul0523
Copy link

Hi , i want this JsonUtilsWrapper code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment