Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save smallnest/78c0f26b8b9982dcc129 to your computer and use it in GitHub Desktop.
Save smallnest/78c0f26b8b9982dcc129 to your computer and use it in GitHub Desktop.
import kafka.consumer.SimpleConsumer
import kafka.common.TopicAndPartition
import kafka.api._
import scala.annotation.tailrec
import scala.util._
object KafkaSimpleConsumerUtils {
def getLastOffset(consumer: SimpleConsumer, topic: String, partition: Int, whichTime: Long, clientName: String): Try[Long] = {
val tap = new TopicAndPartition(topic, partition)
val request = new kafka.api.OffsetRequest(Map(tap -> PartitionOffsetRequestInfo(whichTime, 1)))
val response = consumer.getOffsetsBefore(request)
if (response.hasError) {
val err = response.partitionErrorAndOffsets(tap).error
Failure(new KafkaException("Error fetching data Offset Data the Broker. Reason: " + err))
} else {
//offsets is sorted in descending order, we always want the first
Success(response.partitionErrorAndOffsets(tap).offsets(0))
}
}
@tailrec
def findConsumerAndDo[T](seedBrokers:List[String], port: Int, consumerName: String)(f: SimpleConsumer => T): Option[(SimpleConsumer,T)] =
seedBrokers match {
case Nil => None
case seed :: others =>
Try{
val consumer = new SimpleConsumer(seed, port, 100000, 64 * 1024, consumerName)
val r = f(consumer)
Some((consumer,r))
} match {
case Success(r) => r
case Failure(ex) =>
findConsumerAndDo(others, port, consumerName)(f)
}
}
def findLeader(seedBrokers:List[String], port:Int, consumerName: String, topic: String, partition: Int): Option[PartitionMetadata] = {
findConsumerAndDo(seedBrokers, port, consumerName) {consumer =>
findLeader(consumer, topic, partition)
}.map{_._2}.flatten
}
def findLeader(consumer: SimpleConsumer, topic: String, partition: Int): Option[PartitionMetadata] = {
val topics = Seq(topic)
val req: TopicMetadataRequest = new TopicMetadataRequest(topics, 0)
val resp: TopicMetadataResponse = consumer.send(req)
val metaData = resp.topicsMetadata
(for {
item <- metaData.iterator
part <- item.partitionsMetadata.iterator
} yield part).find(part => part.partitionId == partition)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment