Forked from squito/KafkaSimpleConsumerUtils.scala
Last active
August 29, 2015 14:16
-
-
Save smallnest/78c0f26b8b9982dcc129 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kafka.consumer.SimpleConsumer | |
import kafka.common.TopicAndPartition | |
import kafka.api._ | |
import scala.annotation.tailrec | |
import scala.util._ | |
object KafkaSimpleConsumerUtils { | |
def getLastOffset(consumer: SimpleConsumer, topic: String, partition: Int, whichTime: Long, clientName: String): Try[Long] = { | |
val tap = new TopicAndPartition(topic, partition) | |
val request = new kafka.api.OffsetRequest(Map(tap -> PartitionOffsetRequestInfo(whichTime, 1))) | |
val response = consumer.getOffsetsBefore(request) | |
if (response.hasError) { | |
val err = response.partitionErrorAndOffsets(tap).error | |
Failure(new KafkaException("Error fetching data Offset Data the Broker. Reason: " + err)) | |
} else { | |
//offsets is sorted in descending order, we always want the first | |
Success(response.partitionErrorAndOffsets(tap).offsets(0)) | |
} | |
} | |
@tailrec | |
def findConsumerAndDo[T](seedBrokers:List[String], port: Int, consumerName: String)(f: SimpleConsumer => T): Option[(SimpleConsumer,T)] = | |
seedBrokers match { | |
case Nil => None | |
case seed :: others => | |
Try{ | |
val consumer = new SimpleConsumer(seed, port, 100000, 64 * 1024, consumerName) | |
val r = f(consumer) | |
Some((consumer,r)) | |
} match { | |
case Success(r) => r | |
case Failure(ex) => | |
findConsumerAndDo(others, port, consumerName)(f) | |
} | |
} | |
def findLeader(seedBrokers:List[String], port:Int, consumerName: String, topic: String, partition: Int): Option[PartitionMetadata] = { | |
findConsumerAndDo(seedBrokers, port, consumerName) {consumer => | |
findLeader(consumer, topic, partition) | |
}.map{_._2}.flatten | |
} | |
def findLeader(consumer: SimpleConsumer, topic: String, partition: Int): Option[PartitionMetadata] = { | |
val topics = Seq(topic) | |
val req: TopicMetadataRequest = new TopicMetadataRequest(topics, 0) | |
val resp: TopicMetadataResponse = consumer.send(req) | |
val metaData = resp.topicsMetadata | |
(for { | |
item <- metaData.iterator | |
part <- item.partitionsMetadata.iterator | |
} yield part).find(part => part.partitionId == partition) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment