Created
April 7, 2017 20:46
-
-
Save ssemichev/fa3605c7b10cb6c7b9c8ab54ffbc5865 to your computer and use it in GitHub Desktop.
Convenience methods for interacting with a Kafka cluster
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package falcon.streaming.kafka | |
import java.util.Properties | |
import kafka.api._ | |
import kafka.common.{ ErrorMapping, OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition } | |
import kafka.consumer.{ ConsumerConfig, SimpleConsumer } | |
import scala.collection.mutable.ArrayBuffer | |
import scala.util.Random | |
import scala.util.control.NonFatal | |
/** | |
* Convenience methods for interacting with a Kafka cluster. | |
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> | |
* configuration parameters</a>. | |
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), | |
* NOT zookeeper servers, specified in host1:port1,host2:port2 form | |
*/ | |
// scalastyle:off null | |
// scalastyle:off return | |
class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { | |
import KafkaCluster.{ Err, LeaderOffset, SimpleConsumerConfig } | |
// ConsumerConfig isn't serializable | |
@transient private var _config: SimpleConsumerConfig = null | |
def config: SimpleConsumerConfig = this.synchronized { | |
if (Option(_config).isEmpty) { | |
_config = SimpleConsumerConfig(kafkaParams) | |
} | |
_config | |
} | |
def connect(host: String, port: Int): SimpleConsumer = | |
new SimpleConsumer(host, port, config.socketTimeoutMs, | |
config.socketReceiveBufferBytes, config.clientId) | |
def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] = | |
findLeader(topic, partition).right.map(hp => connect(hp._1, hp._2)) | |
// Metadata api | |
// scalastyle:off | |
// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI | |
// scalastyle:on | |
def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = { | |
val req = TopicMetadataRequest( | |
TopicMetadataRequest.CurrentVersion, | |
0, config.clientId, Seq(topic) | |
) | |
val errs = new Err | |
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer => | |
val resp: TopicMetadataResponse = consumer.send(req) | |
resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata => | |
tm.partitionsMetadata.find(_.partitionId == partition) | |
}.foreach { pm: PartitionMetadata => | |
pm.leader.foreach { leader => | |
return Right((leader.host, leader.port)) | |
} | |
} | |
} | |
Left(errs) | |
} | |
def findLeaders( | |
topicAndPartitions: Set[TopicAndPartition] | |
): Either[Err, Map[TopicAndPartition, (String, Int)]] = { | |
val topics = topicAndPartitions.map(_.topic) | |
val response = getPartitionMetadata(topics).right | |
val answer = response.flatMap { tms: Set[TopicMetadata] => | |
val leaderMap = tms.flatMap { tm: TopicMetadata => | |
tm.partitionsMetadata.flatMap { pm: PartitionMetadata => | |
val tp = TopicAndPartition(tm.topic, pm.partitionId) | |
if (topicAndPartitions(tp)) { | |
pm.leader.map { l => | |
tp -> (l.host -> l.port) | |
} | |
} else { | |
None | |
} | |
} | |
}.toMap | |
if (leaderMap.keys.size == topicAndPartitions.size) { | |
Right(leaderMap) | |
} else { | |
val missing = topicAndPartitions.diff(leaderMap.keySet) | |
val err = new Err | |
err.append(new IllegalStateException(s"Couldn't find leaders for $missing")) | |
Left(err) | |
} | |
} | |
answer | |
} | |
def getPartitions(topics: Set[String]): Either[Err, Set[TopicAndPartition]] = { | |
getPartitionMetadata(topics).right.map { r => | |
r.flatMap { tm: TopicMetadata => | |
tm.partitionsMetadata.map { pm: PartitionMetadata => | |
TopicAndPartition(tm.topic, pm.partitionId) | |
} | |
} | |
} | |
} | |
def getPartitionMetadata(topics: Set[String]): Either[Err, Set[TopicMetadata]] = { | |
val req = TopicMetadataRequest( | |
TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics.toSeq | |
) | |
val errs = new Err | |
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer => | |
val resp: TopicMetadataResponse = consumer.send(req) | |
val respErrs = resp.topicsMetadata.filter(m => m.errorCode != ErrorMapping.NoError) | |
if (respErrs.isEmpty) { | |
return Right(resp.topicsMetadata.toSet) | |
} else { | |
respErrs.foreach { m => | |
val cause = ErrorMapping.exceptionFor(m.errorCode) | |
val msg = s"Error getting partition metadata for '${m.topic}'. Does the topic exist?" | |
errs.append(new IllegalArgumentException(msg, cause)) | |
} | |
} | |
} | |
Left(errs) | |
} | |
// Leader offset api | |
// scalastyle:off | |
// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI | |
// scalastyle:on | |
def getLatestLeaderOffsets( | |
topicAndPartitions: Set[TopicAndPartition] | |
): Either[Err, Map[TopicAndPartition, LeaderOffset]] = | |
getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime) | |
def getEarliestLeaderOffsets( | |
topicAndPartitions: Set[TopicAndPartition] | |
): Either[Err, Map[TopicAndPartition, LeaderOffset]] = | |
getLeaderOffsets(topicAndPartitions, OffsetRequest.EarliestTime) | |
def getLeaderOffsets( | |
topicAndPartitions: Set[TopicAndPartition], | |
before: Long | |
): Either[Err, Map[TopicAndPartition, LeaderOffset]] = { | |
getLeaderOffsets(topicAndPartitions, before, 1).right.map { r => | |
r.map { kv => | |
// mapValues isn't serializable, see SI-7005 | |
kv._1 -> kv._2.head | |
} | |
} | |
} | |
private def flip[K, V](m: Map[K, V]): Map[V, Seq[K]] = | |
m.groupBy(_._2).map { kv => | |
kv._1 -> kv._2.keys.toSeq | |
} | |
def getLeaderOffsets( | |
topicAndPartitions: Set[TopicAndPartition], | |
before: Long, | |
maxNumOffsets: Int | |
): Either[Err, Map[TopicAndPartition, Seq[LeaderOffset]]] = { | |
findLeaders(topicAndPartitions).right.flatMap { tpToLeader => | |
val leaderToTp: Map[(String, Int), Seq[TopicAndPartition]] = flip(tpToLeader) | |
val leaders = leaderToTp.keys | |
var result = Map[TopicAndPartition, Seq[LeaderOffset]]() | |
val errs = new Err | |
withBrokers(leaders, errs) { consumer => | |
val partitionsToGetOffsets: Seq[TopicAndPartition] = | |
leaderToTp((consumer.host, consumer.port)) | |
val reqMap = partitionsToGetOffsets.map { tp: TopicAndPartition => | |
tp -> PartitionOffsetRequestInfo(before, maxNumOffsets) | |
}.toMap | |
val req = OffsetRequest(reqMap) | |
val resp = consumer.getOffsetsBefore(req) | |
val respMap = resp.partitionErrorAndOffsets | |
partitionsToGetOffsets.foreach { tp: TopicAndPartition => | |
respMap.get(tp).foreach { por: PartitionOffsetsResponse => | |
if (por.error == ErrorMapping.NoError) { | |
if (por.offsets.nonEmpty) { | |
result += tp -> por.offsets.map { off => | |
LeaderOffset(consumer.host, consumer.port, off) | |
} | |
} else { | |
errs.append(new IllegalArgumentException( | |
s"Empty offsets for $tp, is $before before log beginning?" | |
)) | |
} | |
} else { | |
errs.append(ErrorMapping.exceptionFor(por.error)) | |
} | |
} | |
} | |
if (result.keys.size == topicAndPartitions.size) { | |
return Right(result) | |
} | |
} | |
val missing = topicAndPartitions.diff(result.keySet) | |
errs.append(new IllegalArgumentException(s"Couldn't find leader offsets for $missing")) | |
Left(errs) | |
} | |
} | |
// Consumer offset api | |
// scalastyle:off | |
// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI | |
// scalastyle:on | |
// this 0 here indicates api version, in this case the original ZK backed api. | |
private def defaultConsumerApiVersion: Short = kafkaParams.getOrElse("offsets.storage", "kafka") match { | |
case "zookeeper" => 0 | |
case "kafka" => 1 | |
} | |
/** Requires Kafka >= 0.8.1.1 */ | |
def getConsumerOffsets( | |
groupId: String, | |
topicAndPartitions: Set[TopicAndPartition] | |
): Either[Err, Map[TopicAndPartition, Long]] = | |
getConsumerOffsets(groupId, topicAndPartitions, defaultConsumerApiVersion) | |
def getConsumerOffsets( | |
groupId: String, | |
topicAndPartitions: Set[TopicAndPartition], | |
consumerApiVersion: Short | |
): Either[Err, Map[TopicAndPartition, Long]] = { | |
getConsumerOffsetMetadata(groupId, topicAndPartitions, consumerApiVersion).right.map { r => | |
r.map { kv => | |
kv._1 -> kv._2.offset | |
} | |
} | |
} | |
/** Requires Kafka >= 0.8.1.1 */ | |
def getConsumerOffsetMetadata( | |
groupId: String, | |
topicAndPartitions: Set[TopicAndPartition] | |
): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = | |
getConsumerOffsetMetadata(groupId, topicAndPartitions, defaultConsumerApiVersion) | |
def getConsumerOffsetMetadata( | |
groupId: String, | |
topicAndPartitions: Set[TopicAndPartition], | |
consumerApiVersion: Short | |
): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = { | |
var result = Map[TopicAndPartition, OffsetMetadataAndError]() | |
val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, consumerApiVersion) | |
val errs = new Err | |
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer => | |
val resp = consumer.fetchOffsets(req) | |
val respMap = resp.requestInfo | |
val needed = topicAndPartitions.diff(result.keySet) | |
needed.foreach { tp: TopicAndPartition => | |
respMap.get(tp).foreach { ome: OffsetMetadataAndError => | |
if (ome.error == ErrorMapping.NoError) { | |
result += tp -> ome | |
} else { | |
errs.append(ErrorMapping.exceptionFor(ome.error)) | |
} | |
} | |
} | |
if (result.keys.size == topicAndPartitions.size) { | |
return Right(result) | |
} | |
} | |
val missing = topicAndPartitions.diff(result.keySet) | |
errs.append(new IllegalArgumentException(s"Couldn't find consumer offsets for $missing")) | |
Left(errs) | |
} | |
/** | |
* ConsumerMetadataRequest to create _consumer_offsets system topic | |
* Example: kafkaCluster.getConsumerMetadata("test_group", Set(TopicAndPartition(topic, 0), TopicAndPartition(topic, 1))) | |
*/ | |
def getConsumerMetadata( | |
groupId: String, | |
topicAndPartitions: Set[TopicAndPartition] | |
): Int = { | |
val req = ConsumerMetadataRequest(groupId, ConsumerMetadataRequest.CurrentVersion, 0, "demoClientId") | |
val errs = new Err | |
var response: Int = 0 | |
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer => | |
val resp = consumer.send(req) | |
response = resp.errorCode.toInt | |
} | |
response | |
} | |
/** Requires Kafka >= 0.8.1.1 */ | |
def setConsumerOffsets( | |
groupId: String, | |
offsets: Map[TopicAndPartition, Long] | |
): Either[Err, Map[TopicAndPartition, Short]] = | |
setConsumerOffsets(groupId, offsets, defaultConsumerApiVersion) | |
def setConsumerOffsets( | |
groupId: String, | |
offsets: Map[TopicAndPartition, Long], | |
consumerApiVersion: Short | |
): Either[Err, Map[TopicAndPartition, Short]] = { | |
val meta = offsets.map { kv => | |
kv._1 -> OffsetAndMetadata(kv._2) | |
} | |
setConsumerOffsetMetadata(groupId, meta, consumerApiVersion) | |
} | |
/** Requires Kafka >= 0.8.1.1 */ | |
def setConsumerOffsetMetadata( | |
groupId: String, | |
metadata: Map[TopicAndPartition, OffsetAndMetadata] | |
): Either[Err, Map[TopicAndPartition, Short]] = | |
setConsumerOffsetMetadata(groupId, metadata, defaultConsumerApiVersion) | |
def setConsumerOffsetMetadata( | |
groupId: String, | |
metadata: Map[TopicAndPartition, OffsetAndMetadata], | |
consumerApiVersion: Short | |
): Either[Err, Map[TopicAndPartition, Short]] = { | |
var result = Map[TopicAndPartition, Short]() | |
val req = OffsetCommitRequest(groupId, metadata, consumerApiVersion) | |
val errs = new Err | |
val topicAndPartitions = metadata.keySet | |
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer => | |
val resp = consumer.commitOffsets(req) | |
val respMap = resp.commitStatus | |
val needed = topicAndPartitions.diff(result.keySet) | |
needed.foreach { tp: TopicAndPartition => | |
respMap.get(tp).foreach { err: Short => | |
if (err == ErrorMapping.NoError) { | |
result += tp -> err | |
} else { | |
errs.append(ErrorMapping.exceptionFor(err)) | |
} | |
} | |
} | |
if (result.keys.size == topicAndPartitions.size) { | |
return Right(result) | |
} | |
} | |
val missing = topicAndPartitions.diff(result.keySet) | |
errs.append(new IllegalArgumentException(s"Couldn't set offsets for $missing")) | |
Left(errs) | |
} | |
// Try a call against potentially multiple brokers, accumulating errors | |
private def withBrokers(brokers: Iterable[(String, Int)], errs: Err)(fn: SimpleConsumer => Any): Unit = { | |
brokers.foreach { hp => | |
var consumer: SimpleConsumer = null | |
try { | |
consumer = connect(hp._1, hp._2) | |
fn(consumer) | |
} catch { | |
case NonFatal(e) => | |
errs.append(e) | |
} finally { | |
if (consumer != null) { | |
consumer.close() | |
} | |
} | |
} | |
} | |
} | |
object KafkaCluster { | |
type Err = ArrayBuffer[Throwable] | |
/** If the result is right, return it, otherwise throw SparkException */ | |
def checkErrors[T](result: Either[Err, T]): T = { | |
result.fold( | |
errs => throw new IllegalArgumentException(errs.mkString("\n")), | |
ok => ok | |
) | |
} | |
case class LeaderOffset(host: String, port: Int, offset: Long) | |
/** | |
* High-level kafka consumers connect to ZK. ConsumerConfig assumes this use case. | |
* Simple consumers connect directly to brokers, but need many of the same configs. | |
* This subclass won't warn about missing ZK params, or presence of broker params. | |
*/ | |
class SimpleConsumerConfig private (brokers: String, originalProps: Properties) | |
extends ConsumerConfig(originalProps) { | |
val seedBrokers: Array[(String, Int)] = brokers.split(",").map { hp => | |
val hpa = hp.split(":") | |
if (hpa.size == 1) { | |
throw new IllegalArgumentException(s"Broker not the in correct format of <host>:<port> [$brokers]") | |
} | |
(hpa(0), hpa(1).toInt) | |
} | |
} | |
object SimpleConsumerConfig { | |
/** | |
* Make a consumer config without requiring group.id or zookeeper.connect, | |
* since communicating with brokers also needs common settings such as timeout | |
*/ | |
def apply(kafkaParams: Map[String, String]): SimpleConsumerConfig = { | |
// These keys are from other pre-existing kafka configs for specifying brokers, accept either | |
val brokers = kafkaParams.get("metadata.broker.list") | |
.orElse(kafkaParams.get("bootstrap.servers")) | |
.getOrElse(throw new IllegalArgumentException( | |
"Must specify metadata.broker.list or bootstrap.servers" | |
)) | |
val props = new Properties() | |
kafkaParams.foreach { | |
case (key, value) => | |
// prevent warnings on parameters ConsumerConfig doesn't know about | |
if (key != "metadata.broker.list" && key != "bootstrap.servers") { | |
props.put(key, value) | |
} | |
} | |
Seq("zookeeper.connect", "group.id").foreach { s => | |
if (!props.containsKey(s)) { | |
props.setProperty(s, "") | |
} | |
} | |
new SimpleConsumerConfig(brokers, props) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
https://gist.github.com/ssemichev/fa3605c7b10cb6c7b9c8ab54ffbc5865#file-kafkacluster-scala-L281
This line occur an error
error: not found: value ConsumerMetadataRequest