Skip to content

Instantly share code, notes, and snippets.

@natewave
Last active May 30, 2017 12:00
Show Gist options
  • Save natewave/9b9ce4c288ecb6615a99c6da74367bad to your computer and use it in GitHub Desktop.
Save natewave/9b9ce4c288ecb6615a99c6da74367bad to your computer and use it in GitHub Desktop.
def getCommittedOffsetsForConsumer(consumerConfig: KafkaConsumerConfig, topicPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] = {
val result = withCustomStringConsumer(consumer => {
consumer.assign(topicPartitions.toList.asJava)
val tps = topicPartitions.map( tp => {
val offsetAndMetadata = consumer.committed(tp)
val startOffset: Long = KafkaCluster.getLogBeginningOffset(consumerConfig.topic).values.head
Option(offsetAndMetadata) match {
case None => {
// log "TopicPartition $tp doesn't have committed offsets"
tp -> startOffset
}
case Some(om) => {
val currentOffsets = om.offset()
if (currentOffsets < startOffset) {
tp -> startOffset
} else {
tp -> currentOffsets
}
}
}
}
).toMap
consumer.close()
tps
})(consumerConfig)
result.asInstanceOf[Map[TopicPartition, Long]]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment