Skip to content

Instantly share code, notes, and snippets.

@ckozak
Last active October 4, 2016 02:41
Show Gist options
  • Save ckozak/d1db987150cdee2f3812 to your computer and use it in GitHub Desktop.
Save ckozak/d1db987150cdee2f3812 to your computer and use it in GitHub Desktop.
def lastOffset(consumer: SimpleConsumer, topic: String, partitionId: Int): Future[Long] = {
val topicAndPartition = new TopicAndPartition(topic, partitionId)
val requestInfo = Map(
topicAndPartition -> new PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)
)
val request = new OffsetRequest(requestInfo, correlationId = Random.nextInt())
val response = Future(consumer.getOffsetsBefore(request))
response map { response =>
response.offsetsGroupedByTopic(topic)(topicAndPartition).offsets.head
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment