Last active
September 15, 2015 10:15
-
-
Save natewave/ed80b115b7d603272f28 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Nizar S. <nisehl at gmail dot com> | |
import collection.JavaConversions._ | |
import scala.collection.immutable.Map | |
import kafka.api._ | |
import kafka.cluster.Broker | |
import kafka.common.OffsetAndMetadata | |
import kafka.common.OffsetMetadataAndError | |
import kafka.common.TopicAndPartition | |
import kafka.api.ConsumerMetadataResponse | |
import kafka.javaapi.OffsetCommitRequest | |
import kafka.api.OffsetCommitResponse | |
import kafka.javaapi.OffsetFetchRequest | |
import kafka.javaapi.OffsetFetchResponse | |
import kafka.network.BlockingChannel | |
import java.util._ | |
case class OffsetManager(channel: BlockingChannel, broker: Broker) | |
object KafkaCommons { | |
def newChannel(host: String, port: Int): BlockingChannel = { | |
val channel = new BlockingChannel(host = host, | |
port = port, | |
readBufferSize = BlockingChannel.UseDefaultBufferSize, | |
writeBufferSize = BlockingChannel.UseDefaultBufferSize, | |
readTimeoutMs = 5000) | |
channel | |
} | |
def getOffsetManager(channel: BlockingChannel, clientId: String, groupId: String, correlationId: Int = 0 /*, retries: Int ???*/): Option[OffsetManager] = { | |
try { | |
channel.connect() | |
channel.send(new ConsumerMetadataRequest( | |
groupId, ConsumerMetadataRequest.CurrentVersion, | |
correlationId, clientId)) | |
val metadataResponse: ConsumerMetadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer) | |
val brokerOpt = metadataResponse.coordinatorOpt | |
val offsetManager: Option[OffsetManager] = brokerOpt.map { broker => | |
channel.disconnect() | |
val brokerChannel = newChannel(broker.host, broker.port) | |
brokerChannel.connect() | |
OffsetManager(brokerChannel, broker) | |
} | |
offsetManager | |
} catch { case e: Exception => | |
None | |
} | |
} | |
def commitOffsets(offsets: Map[TopicAndPartition, Long], groupId: String, clientId: String, correlationId: Int = 0, offsetManager: OffsetManager /*, retries: Int ???*/): Boolean = { | |
val offsetsMetadata = offsets.map { case (k, v) => (k, OffsetAndMetadata(v)) } | |
val commitRequest = new OffsetCommitRequest(groupId, offsetsMetadata, correlationId, | |
clientId, 1: Short); // version 1 and above commit to Kafka, version 0 commits to ZooKeeper | |
val channel = offsetManager.channel | |
try { | |
channel.send(commitRequest.underlying) | |
val commitResponse = OffsetCommitResponse.readFrom(channel.receive().buffer); | |
commitResponse.hasError | |
} catch { case e: Exception => false } | |
} | |
def fetchOffsets(topicsAndPartitions: Seq[TopicAndPartition], | |
groupId: String, | |
clientId: String, | |
correlationId: Int = 0, | |
offsetManager: OffsetManager): Map[TopicAndPartition, Long] = { | |
val fetchRequest = new OffsetFetchRequest( | |
groupId, | |
scala.collection.JavaConversions.seqAsJavaList(topicsAndPartitions), | |
1: Short, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper | |
correlationId, | |
clientId) | |
val channel = offsetManager.channel | |
try { | |
channel.send(fetchRequest.underlying) | |
val fetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer) | |
val offsets = fetchResponse.offsets.map { case (k, v) => (k, v.offset) } | |
offsets.toMap | |
} | |
catch { case e: Exception => Map.empty } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment