Skip to content

Instantly share code, notes, and snippets.

@lazyval
Last active June 15, 2016 12:45
Show Gist options
  • Save lazyval/2e4b5a53d61d3098a37a41a8d5b1ffcb to your computer and use it in GitHub Desktop.
Save lazyval/2e4b5a53d61d3098a37a41a8d5b1ffcb to your computer and use it in GitHub Desktop.
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.2.1"
import kafka.api.FetchRequestBuilder
import kafka.cluster.Broker
import kafka.javaapi.{FetchRequest, TopicMetadataRequest}
import kafka.javaapi.consumer.SimpleConsumer
import kafka.message.Message
import scala.collection.JavaConverters._
class Reader(anyBroker: String, topic: String, partition: Int, offset: Int) {
val ClientId = "kafka-fetch-size-client"
val SoTimeoutMs = 20000
private val connection = {
val Broker(id, host, port) = findLeader(anyBroker, topic, partition)
new SimpleConsumer(host, port, SoTimeoutMs, 1024 * 1024, ClientId)
}
def read(fetchSize: Int): Seq[Message] = {
val request = new FetchRequestBuilder()
.addFetch(topic, partition, offset, fetchSize)
.build()
val it = connection.fetch(request).messageSet(topic, partition).iterator().asScala
it.map(msg => msg.message).toSeq
}
private def findLeader(anyBroker: String, topic: String, partition: Int): Broker = {
val Array(host, port) = anyBroker.split(":")
val metadata = new SimpleConsumer(host, port.toInt, SoTimeoutMs, 1024 * 1024, ClientId)
val whoIsLeader = new TopicMetadataRequest(java.util.Collections.singletonList(topic))
val answer = metadata.send(whoIsLeader)
val leader = for {
topicMeta <- answer.topicsMetadata.asScala if topicMeta.topic == topic
partitionMeta <- topicMeta.partitionsMetadata.asScala if partitionMeta.partitionId == partition
} yield partitionMeta.leader
metadata.close()
println(s"Found leader for partition, it's ${leader.head}")
leader.head
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment