Skip to content

Instantly share code, notes, and snippets.

@natewave
Last active September 15, 2015 10:15
Show Gist options
  • Save natewave/ed80b115b7d603272f28 to your computer and use it in GitHub Desktop.
Save natewave/ed80b115b7d603272f28 to your computer and use it in GitHub Desktop.
// Nizar S. <nisehl at gmail dot com>
import collection.JavaConversions._
import scala.collection.immutable.Map
import kafka.api._
import kafka.cluster.Broker
import kafka.common.OffsetAndMetadata
import kafka.common.OffsetMetadataAndError
import kafka.common.TopicAndPartition
import kafka.api.ConsumerMetadataResponse
import kafka.javaapi.OffsetCommitRequest
import kafka.api.OffsetCommitResponse
import kafka.javaapi.OffsetFetchRequest
import kafka.javaapi.OffsetFetchResponse
import kafka.network.BlockingChannel
import java.util._
case class OffsetManager(channel: BlockingChannel, broker: Broker)
object KafkaCommons {
def newChannel(host: String, port: Int): BlockingChannel = {
val channel = new BlockingChannel(host = host,
port = port,
readBufferSize = BlockingChannel.UseDefaultBufferSize,
writeBufferSize = BlockingChannel.UseDefaultBufferSize,
readTimeoutMs = 5000)
channel
}
def getOffsetManager(channel: BlockingChannel, clientId: String, groupId: String, correlationId: Int = 0 /*, retries: Int ???*/): Option[OffsetManager] = {
try {
channel.connect()
channel.send(new ConsumerMetadataRequest(
groupId, ConsumerMetadataRequest.CurrentVersion,
correlationId, clientId))
val metadataResponse: ConsumerMetadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer)
val brokerOpt = metadataResponse.coordinatorOpt
val offsetManager: Option[OffsetManager] = brokerOpt.map { broker =>
channel.disconnect()
val brokerChannel = newChannel(broker.host, broker.port)
brokerChannel.connect()
OffsetManager(brokerChannel, broker)
}
offsetManager
} catch { case e: Exception =>
None
}
}
def commitOffsets(offsets: Map[TopicAndPartition, Long], groupId: String, clientId: String, correlationId: Int = 0, offsetManager: OffsetManager /*, retries: Int ???*/): Boolean = {
val offsetsMetadata = offsets.map { case (k, v) => (k, OffsetAndMetadata(v)) }
val commitRequest = new OffsetCommitRequest(groupId, offsetsMetadata, correlationId,
clientId, 1: Short); // version 1 and above commit to Kafka, version 0 commits to ZooKeeper
val channel = offsetManager.channel
try {
channel.send(commitRequest.underlying)
val commitResponse = OffsetCommitResponse.readFrom(channel.receive().buffer);
commitResponse.hasError
} catch { case e: Exception => false }
}
def fetchOffsets(topicsAndPartitions: Seq[TopicAndPartition],
groupId: String,
clientId: String,
correlationId: Int = 0,
offsetManager: OffsetManager): Map[TopicAndPartition, Long] = {
val fetchRequest = new OffsetFetchRequest(
groupId,
scala.collection.JavaConversions.seqAsJavaList(topicsAndPartitions),
1: Short, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper
correlationId,
clientId)
val channel = offsetManager.channel
try {
channel.send(fetchRequest.underlying)
val fetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer)
val offsets = fetchResponse.offsets.map { case (k, v) => (k, v.offset) }
offsets.toMap
}
catch { case e: Exception => Map.empty }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment