Last active
September 28, 2022 23:36
-
-
Save timvw/ddb5dfd470eb06786209e218fce5e190 to your computer and use it in GitHub Desktop.
Spark structured streaming: Commit source offsets to Kafka on QueryProgress
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val groupId = "demo-consumer" | |
val bootstrapServers = "localhost:9092" | |
val spark = SparkSession | |
.builder() | |
.master("local[*]") | |
.appName("demo") | |
//.config("spark.sql.streaming.stateStore.maintenanceInterval", "10s") | |
//.config("spark.sql.streaming.metricsEnabled", true) | |
.config("spark.sql.streaming.minBatchesToRetain", 2) | |
.config("spark.sql.streaming.stateStore.minDeltasForSnapshot", 2) | |
.getOrCreate() | |
import spark.implicits._ | |
val props = new Properties() | |
props.put("group.id", groupId) | |
props.put("bootstrap.servers", bootstrapServers) | |
props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer") | |
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer") | |
props.put("enable.auto.commit", "false") | |
val kafkaConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](props) | |
val listener = new CommitOffsetsOnProgressQueryListener(kafkaConsumer) | |
spark.streams.addListener(listener) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
case class CommitOffsetsOnProgressQueryListener(kafkaConsumer: KafkaConsumer[_, _]) extends StreamingQueryListener { | |
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { } | |
override def onQueryProgress(event: QueryProgressEvent): Unit = { | |
val offsets = event | |
.getSourceOffsets() | |
.mapValues(new OffsetAndMetadata(_)) | |
import scala.collection.JavaConversions._ | |
kafkaConsumer.commitSync(offsets) | |
} | |
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { } | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
implicit class QueryProgressEventExt(event: QueryProgressEvent) { | |
def getSourceOffsets(): Map[TopicPartition, Long] = event.progress | |
.sources // assumption: sources are all kafka, all on the same cluster | |
.map(_.endOffset) | |
.flatMap(JsonUtilsWrapper.jsonToOffsets) // extract all offsets | |
.groupBy(_._1) // take the smallest offset per topic partition | |
.mapValues(_.sortWith((a, b) => a._2.compareTo(b._2) < 0).head._2) | |
} |
It once worked for me using spark 2.3 ..
The clue was in making sure the "separate" (different group-id) kafka consumer instance was subscribed to the topic(s).. otherwise the broker would not accept the commits..
Hi , i want this JsonUtilsWrapper code
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi,
Is it working for you?
Thanks