Skip to content

Instantly share code, notes, and snippets.

@RonBarabash
Last active December 19, 2022 11:19
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save RonBarabash/ef85efdfc278b37793749278f04ae59f to your computer and use it in GitHub Desktop.
Save RonBarabash/ef85efdfc278b37793749278f04ae59f to your computer and use it in GitHub Desktop.
Measuring consumer lag in Spark Structured Streaming
import java.util
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.yotpo.metorikku.exceptions.MetorikkuException
import org.apache.kafka.clients.consumer.{KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.common.TopicPartition
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
class KafkaLagWriter(kafkaConsumer: KafkaConsumer[String, String], topic: String) extends StreamingQueryListener {
private val consumer = kafkaConsumer
def onQueryStarted(event: QueryStartedEvent): Unit = {
}
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {
}
def onQueryProgress(event: QueryProgressEvent): Unit = {
val om = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
om.registerModule(DefaultScalaModule)
event.progress.sources.foreach(source => {
val jsonOffsets = om.readValue(source.endOffset, classOf[Map[String, Map[String, Int]]])
jsonOffsets.keys.filter(key => key == topic)
.foreach(topic => {
val topicPartitionMap = new util.HashMap[TopicPartition, OffsetAndMetadata]()
val offsets = jsonOffsets.get(topic)
offsets match {
case Some(topicOffsetData) =>
topicOffsetData.keys.foreach(partition => {
val tp = new TopicPartition(topic, partition.toInt)
val oam = new OffsetAndMetadata(topicOffsetData(partition).toLong)
topicPartitionMap.put(tp, oam)
})
case _ =>
throw Exception(s"could not fetch topic offsets")
}
consumer.commitSync(topicPartitionMap)
})
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment