Skip to content

Instantly share code, notes, and snippets.

@CharlesAHunt
Last active June 29, 2018 17:49
Show Gist options
  • Save CharlesAHunt/bd1760671f97c4c4511f76572846d754 to your computer and use it in GitHub Desktop.
Save CharlesAHunt/bd1760671f97c4c4511f76572846d754 to your computer and use it in GitHub Desktop.
Kafka Consumer Get Latest 10 messages Example
import collection.JavaConverters._
object KafkaConsumer {
val topicName = "yourTopicName"
val partitionNumber = 1
val offsetOnPoll = 10
val topicPartition = new TopicPartition(topicName, partitionNumber)
def main(args: Array[String]): Unit = {
val properties = new Properties()
properties.put("bootstrap.servers", "localhost:9092")
properties.put("group.id", "consumer-group-1")
properties.put("key.deserializer", classOf[StringDeserializer])
properties.put("value.deserializer", classOf[StringDeserializer])
val kafkaConsumer = new KafkaConsumer[String, String](properties)
kafkaConsumer.subscribe(List(topicPartition.topic).asJava)
val getLatestOffsets = kafkaConsumer.endOffsets(List(topicPartition).asJava)
kafkaConsumer.seek(topicPartition, getLatestOffsets.asScala.head._2 - offsetOnPoll)
val results = kafkaConsumer.poll(3000).asScala
//iterate over the last `offsetOnPoll` number of results
for (consumerRecord <- results) {
val valueOf = consumerRecord.value()
// Do your stuff here
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment