Skip to content

Instantly share code, notes, and snippets.

@ssemichev
Created April 7, 2017 20:46
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ssemichev/fa3605c7b10cb6c7b9c8ab54ffbc5865 to your computer and use it in GitHub Desktop.
Save ssemichev/fa3605c7b10cb6c7b9c8ab54ffbc5865 to your computer and use it in GitHub Desktop.
Convenience methods for interacting with a Kafka cluster
package falcon.streaming.kafka
import java.util.Properties
import kafka.api._
import kafka.common.{ ErrorMapping, OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition }
import kafka.consumer.{ ConsumerConfig, SimpleConsumer }
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import scala.util.control.NonFatal
/**
* Convenience methods for interacting with a Kafka cluster.
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
* configuration parameters</a>.
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
* NOT zookeeper servers, specified in host1:port1,host2:port2 form
*/
// scalastyle:off null
// scalastyle:off return
class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
import KafkaCluster.{ Err, LeaderOffset, SimpleConsumerConfig }
// ConsumerConfig isn't serializable
@transient private var _config: SimpleConsumerConfig = null
def config: SimpleConsumerConfig = this.synchronized {
if (Option(_config).isEmpty) {
_config = SimpleConsumerConfig(kafkaParams)
}
_config
}
def connect(host: String, port: Int): SimpleConsumer =
new SimpleConsumer(host, port, config.socketTimeoutMs,
config.socketReceiveBufferBytes, config.clientId)
def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] =
findLeader(topic, partition).right.map(hp => connect(hp._1, hp._2))
// Metadata api
// scalastyle:off
// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
// scalastyle:on
def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = {
val req = TopicMetadataRequest(
TopicMetadataRequest.CurrentVersion,
0, config.clientId, Seq(topic)
)
val errs = new Err
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
val resp: TopicMetadataResponse = consumer.send(req)
resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata =>
tm.partitionsMetadata.find(_.partitionId == partition)
}.foreach { pm: PartitionMetadata =>
pm.leader.foreach { leader =>
return Right((leader.host, leader.port))
}
}
}
Left(errs)
}
def findLeaders(
topicAndPartitions: Set[TopicAndPartition]
): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
val topics = topicAndPartitions.map(_.topic)
val response = getPartitionMetadata(topics).right
val answer = response.flatMap { tms: Set[TopicMetadata] =>
val leaderMap = tms.flatMap { tm: TopicMetadata =>
tm.partitionsMetadata.flatMap { pm: PartitionMetadata =>
val tp = TopicAndPartition(tm.topic, pm.partitionId)
if (topicAndPartitions(tp)) {
pm.leader.map { l =>
tp -> (l.host -> l.port)
}
} else {
None
}
}
}.toMap
if (leaderMap.keys.size == topicAndPartitions.size) {
Right(leaderMap)
} else {
val missing = topicAndPartitions.diff(leaderMap.keySet)
val err = new Err
err.append(new IllegalStateException(s"Couldn't find leaders for $missing"))
Left(err)
}
}
answer
}
def getPartitions(topics: Set[String]): Either[Err, Set[TopicAndPartition]] = {
getPartitionMetadata(topics).right.map { r =>
r.flatMap { tm: TopicMetadata =>
tm.partitionsMetadata.map { pm: PartitionMetadata =>
TopicAndPartition(tm.topic, pm.partitionId)
}
}
}
}
def getPartitionMetadata(topics: Set[String]): Either[Err, Set[TopicMetadata]] = {
val req = TopicMetadataRequest(
TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics.toSeq
)
val errs = new Err
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
val resp: TopicMetadataResponse = consumer.send(req)
val respErrs = resp.topicsMetadata.filter(m => m.errorCode != ErrorMapping.NoError)
if (respErrs.isEmpty) {
return Right(resp.topicsMetadata.toSet)
} else {
respErrs.foreach { m =>
val cause = ErrorMapping.exceptionFor(m.errorCode)
val msg = s"Error getting partition metadata for '${m.topic}'. Does the topic exist?"
errs.append(new IllegalArgumentException(msg, cause))
}
}
}
Left(errs)
}
// Leader offset api
// scalastyle:off
// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
// scalastyle:on
def getLatestLeaderOffsets(
topicAndPartitions: Set[TopicAndPartition]
): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime)
def getEarliestLeaderOffsets(
topicAndPartitions: Set[TopicAndPartition]
): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
getLeaderOffsets(topicAndPartitions, OffsetRequest.EarliestTime)
def getLeaderOffsets(
topicAndPartitions: Set[TopicAndPartition],
before: Long
): Either[Err, Map[TopicAndPartition, LeaderOffset]] = {
getLeaderOffsets(topicAndPartitions, before, 1).right.map { r =>
r.map { kv =>
// mapValues isn't serializable, see SI-7005
kv._1 -> kv._2.head
}
}
}
private def flip[K, V](m: Map[K, V]): Map[V, Seq[K]] =
m.groupBy(_._2).map { kv =>
kv._1 -> kv._2.keys.toSeq
}
def getLeaderOffsets(
topicAndPartitions: Set[TopicAndPartition],
before: Long,
maxNumOffsets: Int
): Either[Err, Map[TopicAndPartition, Seq[LeaderOffset]]] = {
findLeaders(topicAndPartitions).right.flatMap { tpToLeader =>
val leaderToTp: Map[(String, Int), Seq[TopicAndPartition]] = flip(tpToLeader)
val leaders = leaderToTp.keys
var result = Map[TopicAndPartition, Seq[LeaderOffset]]()
val errs = new Err
withBrokers(leaders, errs) { consumer =>
val partitionsToGetOffsets: Seq[TopicAndPartition] =
leaderToTp((consumer.host, consumer.port))
val reqMap = partitionsToGetOffsets.map { tp: TopicAndPartition =>
tp -> PartitionOffsetRequestInfo(before, maxNumOffsets)
}.toMap
val req = OffsetRequest(reqMap)
val resp = consumer.getOffsetsBefore(req)
val respMap = resp.partitionErrorAndOffsets
partitionsToGetOffsets.foreach { tp: TopicAndPartition =>
respMap.get(tp).foreach { por: PartitionOffsetsResponse =>
if (por.error == ErrorMapping.NoError) {
if (por.offsets.nonEmpty) {
result += tp -> por.offsets.map { off =>
LeaderOffset(consumer.host, consumer.port, off)
}
} else {
errs.append(new IllegalArgumentException(
s"Empty offsets for $tp, is $before before log beginning?"
))
}
} else {
errs.append(ErrorMapping.exceptionFor(por.error))
}
}
}
if (result.keys.size == topicAndPartitions.size) {
return Right(result)
}
}
val missing = topicAndPartitions.diff(result.keySet)
errs.append(new IllegalArgumentException(s"Couldn't find leader offsets for $missing"))
Left(errs)
}
}
// Consumer offset api
// scalastyle:off
// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
// scalastyle:on
// this 0 here indicates api version, in this case the original ZK backed api.
private def defaultConsumerApiVersion: Short = kafkaParams.getOrElse("offsets.storage", "kafka") match {
case "zookeeper" => 0
case "kafka" => 1
}
/** Requires Kafka >= 0.8.1.1 */
def getConsumerOffsets(
groupId: String,
topicAndPartitions: Set[TopicAndPartition]
): Either[Err, Map[TopicAndPartition, Long]] =
getConsumerOffsets(groupId, topicAndPartitions, defaultConsumerApiVersion)
def getConsumerOffsets(
groupId: String,
topicAndPartitions: Set[TopicAndPartition],
consumerApiVersion: Short
): Either[Err, Map[TopicAndPartition, Long]] = {
getConsumerOffsetMetadata(groupId, topicAndPartitions, consumerApiVersion).right.map { r =>
r.map { kv =>
kv._1 -> kv._2.offset
}
}
}
/** Requires Kafka >= 0.8.1.1 */
def getConsumerOffsetMetadata(
groupId: String,
topicAndPartitions: Set[TopicAndPartition]
): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] =
getConsumerOffsetMetadata(groupId, topicAndPartitions, defaultConsumerApiVersion)
def getConsumerOffsetMetadata(
groupId: String,
topicAndPartitions: Set[TopicAndPartition],
consumerApiVersion: Short
): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = {
var result = Map[TopicAndPartition, OffsetMetadataAndError]()
val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, consumerApiVersion)
val errs = new Err
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
val resp = consumer.fetchOffsets(req)
val respMap = resp.requestInfo
val needed = topicAndPartitions.diff(result.keySet)
needed.foreach { tp: TopicAndPartition =>
respMap.get(tp).foreach { ome: OffsetMetadataAndError =>
if (ome.error == ErrorMapping.NoError) {
result += tp -> ome
} else {
errs.append(ErrorMapping.exceptionFor(ome.error))
}
}
}
if (result.keys.size == topicAndPartitions.size) {
return Right(result)
}
}
val missing = topicAndPartitions.diff(result.keySet)
errs.append(new IllegalArgumentException(s"Couldn't find consumer offsets for $missing"))
Left(errs)
}
/**
* ConsumerMetadataRequest to create _consumer_offsets system topic
* Example: kafkaCluster.getConsumerMetadata("test_group", Set(TopicAndPartition(topic, 0), TopicAndPartition(topic, 1)))
*/
def getConsumerMetadata(
groupId: String,
topicAndPartitions: Set[TopicAndPartition]
): Int = {
val req = ConsumerMetadataRequest(groupId, ConsumerMetadataRequest.CurrentVersion, 0, "demoClientId")
val errs = new Err
var response: Int = 0
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
val resp = consumer.send(req)
response = resp.errorCode.toInt
}
response
}
/** Requires Kafka >= 0.8.1.1 */
def setConsumerOffsets(
groupId: String,
offsets: Map[TopicAndPartition, Long]
): Either[Err, Map[TopicAndPartition, Short]] =
setConsumerOffsets(groupId, offsets, defaultConsumerApiVersion)
def setConsumerOffsets(
groupId: String,
offsets: Map[TopicAndPartition, Long],
consumerApiVersion: Short
): Either[Err, Map[TopicAndPartition, Short]] = {
val meta = offsets.map { kv =>
kv._1 -> OffsetAndMetadata(kv._2)
}
setConsumerOffsetMetadata(groupId, meta, consumerApiVersion)
}
/** Requires Kafka >= 0.8.1.1 */
def setConsumerOffsetMetadata(
groupId: String,
metadata: Map[TopicAndPartition, OffsetAndMetadata]
): Either[Err, Map[TopicAndPartition, Short]] =
setConsumerOffsetMetadata(groupId, metadata, defaultConsumerApiVersion)
def setConsumerOffsetMetadata(
groupId: String,
metadata: Map[TopicAndPartition, OffsetAndMetadata],
consumerApiVersion: Short
): Either[Err, Map[TopicAndPartition, Short]] = {
var result = Map[TopicAndPartition, Short]()
val req = OffsetCommitRequest(groupId, metadata, consumerApiVersion)
val errs = new Err
val topicAndPartitions = metadata.keySet
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
val resp = consumer.commitOffsets(req)
val respMap = resp.commitStatus
val needed = topicAndPartitions.diff(result.keySet)
needed.foreach { tp: TopicAndPartition =>
respMap.get(tp).foreach { err: Short =>
if (err == ErrorMapping.NoError) {
result += tp -> err
} else {
errs.append(ErrorMapping.exceptionFor(err))
}
}
}
if (result.keys.size == topicAndPartitions.size) {
return Right(result)
}
}
val missing = topicAndPartitions.diff(result.keySet)
errs.append(new IllegalArgumentException(s"Couldn't set offsets for $missing"))
Left(errs)
}
// Try a call against potentially multiple brokers, accumulating errors
private def withBrokers(brokers: Iterable[(String, Int)], errs: Err)(fn: SimpleConsumer => Any): Unit = {
brokers.foreach { hp =>
var consumer: SimpleConsumer = null
try {
consumer = connect(hp._1, hp._2)
fn(consumer)
} catch {
case NonFatal(e) =>
errs.append(e)
} finally {
if (consumer != null) {
consumer.close()
}
}
}
}
}
object KafkaCluster {
type Err = ArrayBuffer[Throwable]
/** If the result is right, return it, otherwise throw SparkException */
def checkErrors[T](result: Either[Err, T]): T = {
result.fold(
errs => throw new IllegalArgumentException(errs.mkString("\n")),
ok => ok
)
}
case class LeaderOffset(host: String, port: Int, offset: Long)
/**
* High-level kafka consumers connect to ZK. ConsumerConfig assumes this use case.
* Simple consumers connect directly to brokers, but need many of the same configs.
* This subclass won't warn about missing ZK params, or presence of broker params.
*/
class SimpleConsumerConfig private (brokers: String, originalProps: Properties)
extends ConsumerConfig(originalProps) {
val seedBrokers: Array[(String, Int)] = brokers.split(",").map { hp =>
val hpa = hp.split(":")
if (hpa.size == 1) {
throw new IllegalArgumentException(s"Broker not the in correct format of <host>:<port> [$brokers]")
}
(hpa(0), hpa(1).toInt)
}
}
object SimpleConsumerConfig {
/**
* Make a consumer config without requiring group.id or zookeeper.connect,
* since communicating with brokers also needs common settings such as timeout
*/
def apply(kafkaParams: Map[String, String]): SimpleConsumerConfig = {
// These keys are from other pre-existing kafka configs for specifying brokers, accept either
val brokers = kafkaParams.get("metadata.broker.list")
.orElse(kafkaParams.get("bootstrap.servers"))
.getOrElse(throw new IllegalArgumentException(
"Must specify metadata.broker.list or bootstrap.servers"
))
val props = new Properties()
kafkaParams.foreach {
case (key, value) =>
// prevent warnings on parameters ConsumerConfig doesn't know about
if (key != "metadata.broker.list" && key != "bootstrap.servers") {
props.put(key, value)
}
}
Seq("zookeeper.connect", "group.id").foreach { s =>
if (!props.containsKey(s)) {
props.setProperty(s, "")
}
}
new SimpleConsumerConfig(brokers, props)
}
}
}
@swordspoet
Copy link

https://gist.github.com/ssemichev/fa3605c7b10cb6c7b9c8ab54ffbc5865#file-kafkacluster-scala-L281

This line occur an error error: not found: value ConsumerMetadataRequest

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment