Skip to content

Instantly share code, notes, and snippets.

@ariens
Last active February 22, 2017 04:40
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save ariens/e6a39bc3dbeb11467e53 to your computer and use it in GitHub Desktop.
Save ariens/e6a39bc3dbeb11467e53 to your computer and use it in GitHub Desktop.
Kafka Offset Range Provider/Storage Engine
package com.blackberry.bdp.korpse
import scala.collection.parallel.ParIterable
import scala.collection.mutable
import scala.collection.mutable.{ Map => MutableMap }
import org.apache.spark.streaming.kafka.OffsetRange
import kafka.api._
import kafka.api.OffsetResponse
import kafka.api.PartitionOffsetsResponse
import kafka.cluster.Broker
import kafka.network.BlockingChannel
import kafka.common.ErrorMapping
import kafka.common.{ TopicAndPartition, OffsetAndMetadata }
import org.slf4j.{ Logger, LoggerFactory }
import java.io.IOException
case class HostAndPort(host: String, port: Int)
case class AllBrokersFailed(str: String) extends Exception(str)
case class CorrelationMismatchException(str: String) extends Exception(str)
object Korpse {
def apply(seedBrokersStr: String,
topicsStr: String,
consumerGroupId: String,
kafkaClientId: String,
defaultOffset: Long = -1) = {
new Korpse(parseBrokerString(seedBrokersStr),
parseTopicsString(topicsStr),
consumerGroupId,
kafkaClientId,
defaultOffset)
}
def parseTopicsString(topicsStr: String): Seq[String] = {
topicsStr.split("""\s?,\s?""").map(_.trim).toSeq
}
def parseBrokerString(seedBrokerStr: String): Seq[HostAndPort] = {
val seedBrokerBuffer = new mutable.ListBuffer[HostAndPort]()
for (seedBroker <- seedBrokerStr
.split("""\s?,\s?""")
.map(_.trim)) {
val host :: port :: xs = seedBroker
.split("""\s?:\s?""").map(_.trim).toList
seedBrokerBuffer += HostAndPort(host, port.toInt)
}
seedBrokerBuffer.toSeq
}
}
class Korpse(
seedBrokers: Seq[HostAndPort],
topics: Seq[String],
consumerGroupId: String,
kafkaClientId: String,
defaultOffset: Long) {
val LOG: Logger = LoggerFactory.getLogger(this.getClass);
var correlationId = 0
val backOffMs = 500
val retries = 3
val backoffExponent = 2
val partitions = getPartitions()
var partitionDefaultStarts: Option[Map[TopicAndPartition, Long]] = None
var topicDefaultStarts: Option[Map[String, Long]] = None
var globalDefaultStarts: Long = -1
// Gets a channel to a specific broker
private def getChannel(broker: Broker): Option[BlockingChannel] = {
return getChannel(HostAndPort(broker.host, broker.port), true)
}
// Gets a channel from a list of seed brokers
private def getChannel(seedBrokers: Seq[HostAndPort]): Option[BlockingChannel] = {
for (seedBroker <- seedBrokers) {
try {
return getChannel(new HostAndPort(seedBroker.host, seedBroker.port))
} catch {
case ioe: IOException => println(s"Blocking Channel "
+ "IOException on $host:$port")
}
}
return None
}
// Gets a channel from a host and port
private def getChannel(host: HostAndPort,
ignoreCoordinatorRedirect: Boolean = false):
Option[BlockingChannel] = {
try {
println(s"Establishing channel to $host")
var channel = new BlockingChannel(host.host,
host.port,
BlockingChannel.UseDefaultBufferSize,
BlockingChannel.UseDefaultBufferSize,
5000);
correlationId = correlationId + 1
channel.connect
channel.send(ConsumerMetadataRequest(consumerGroupId,
ConsumerMetadataRequest.CurrentVersion,
correlationId,
kafkaClientId));
val metadataResponse = ConsumerMetadataResponse.readFrom(
channel.receive.buffer);
if (correlationId != metadataResponse.correlationId)
throw new CorrelationMismatchException(
"ConsumerMetadataRequest: " + correlationId + ", " +
"ConsumerMetadataResponse: " + metadataResponse.correlationId)
if (metadataResponse.errorCode == ErrorMapping.NoError) {
if (ignoreCoordinatorRedirect) {
println("Connected to " + channel.host + ":" + channel.port
+ " and not redirecting to my coordinator for offset mgmt")
return Option(channel)
}
val offsetManager = metadataResponse.coordinatorOpt
if (offsetManager.isDefined) {
channel.disconnect
channel = new BlockingChannel(offsetManager.get.host,
offsetManager.get.port,
BlockingChannel.UseDefaultBufferSize,
BlockingChannel.UseDefaultBufferSize,
5000);
channel.connect
return Option(channel)
}
} else {
println("Error code: " + metadataResponse.errorCode)
return None
}
} catch {
case ioe: IOException =>
println(s"Blocking Channel "
+ "IOException on $host:$port")
return None
}
return None
}
/**
* This mapping of TopicAndPartition to Broker is used as
* subsequent OffsetRequest's to fetch latest/earliest
* offset must be sent to the partiton leader only
*/
private def getPartitions(): Map[TopicAndPartition, Broker] = {
val tempMap = MutableMap[TopicAndPartition, Broker]()
correlationId = correlationId + 1
val channel = getChannel(seedBrokers)
if (!channel.isDefined)
throw new Exception("Cannot get channel from seed brokers")
channel.get.send(new TopicMetadataRequest(topics, correlationId))
val response = TopicMetadataResponse.readFrom(channel.get.receive.buffer)
channel.get.disconnect()
if (correlationId != response.correlationId)
throw new CorrelationMismatchException(
"TopicMetadataRequest: " + correlationId + ", " +
"TopicMetadataResponse: " + response.correlationId)
for (topicMetadata <- response.topicsMetadata) {
for (partitionMetadata <- topicMetadata.partitionsMetadata) {
if (!partitionMetadata.leader.isDefined) {
println("ERROR: leader not defined for " + partitionMetadata)
} else {
println("discovered partition: " + partitionMetadata)
val partition = TopicAndPartition(topicMetadata.topic,
partitionMetadata.partitionId)
tempMap += (partition -> partitionMetadata.leader.get)
}
}
}
return tempMap.toMap
}
/**
* See code references 1 and 2, there's something weird about
* the OffsetRequest, I would have suspected that a None would
* have been present for the optional instead of always getting
* back a -1. There's the default Kafka parameter named
* auto.offset.reset that defaults to largest (-1) which doesn't
* seem to be relevant for using a blocking channel to make API
* calls directly. Needs more investigation as this workaround
* seems hackish -- dariens
*/
def getFromOffsets(): Option[Map[TopicAndPartition, Long]] = {
var fromOffsets = MutableMap[TopicAndPartition, Long]()
var unknowns = MutableMap[TopicAndPartition, Broker]()
val channel = getChannel(seedBrokers)
if (!channel.isDefined)
throw new Exception("Cannot get channel from seed brokers")
correlationId = correlationId + 1
val fetchRequest = new OffsetFetchRequest(consumerGroupId,
partitions.keySet.toSeq, 1,
correlationId,
kafkaClientId);
channel.get.send(fetchRequest)
val fetchResponse = OffsetFetchResponse.readFrom(channel.get.receive.buffer)
channel.get.disconnect()
if (correlationId != fetchResponse.correlationId)
throw new CorrelationMismatchException(
"OffsetFetchRequest: " + correlationId + ", " +
"OffsetFetchResponse: " + fetchResponse.correlationId)
for ((partition, broker) <- partitions) {
val metadataAndError = fetchResponse.requestInfo.get(partition)
if (metadataAndError.isDefined) {
val errorCode = metadataAndError.get.error
if (errorCode == ErrorMapping.NotCoordinatorForConsumerCode) {
println("channel created on " + channel.get.host + ":" + channel.get.port
+ " is not coordinator for group " + consumerGroupId
+ " and client ID " + kafkaClientId + ", retrying")
Thread sleep backOffMs
return getFromOffsets
} else if (errorCode == ErrorMapping.OffsetsLoadInProgressCode) {
LOG.warn("offset load in processs for " + partition.topic
+ "-" + partition.partition + ", retrying")
Thread sleep backOffMs
return getFromOffsets
} else {
if (metadataAndError.get.offset == -1) {
println(s"$partition received -1 OffsetFetchResponse "
+ s"fetching default for time $defaultOffset")
// Code reference [1] -- why do we need to do this?!
val offset = getDefaultFromOffset(partition, broker)
if (offset.isDefined) {
println(s"$partition default offset for $defaultOffset was " + offset.get)
fromOffsets += (partition -> offset.get)
} else {
throw new Exception(s"couldn't find offset for partition: $partition")
}
} else {
fromOffsets += (partition -> metadataAndError.get.offset)
println("mapping from offset for partion " + partition
+ " to " + metadataAndError.get.offset)
}
}
} else {
// Code reference [2] -- Does this ever execute--since it appears we get a -1 when
// There are no offsets committed
println(partition + ": Committed offset not found, fetching default")
val offset = getDefaultFromOffset(partition, broker)
if (offset.isDefined) {
println(s"$partition default offset for $defaultOffset was " + offset.get)
fromOffsets += (partition -> offset.get)
} else {
throw new Exception("couldn't find any from offset for partition: " + partition)
}
}
}
return Option(fromOffsets.toMap)
}
def getDefaultFromOffset(partition: TopicAndPartition,
broker: Broker): Option[Long] = {
val channel = getChannel(broker)
if (!channel.isDefined)
throw new Exception(s"Cannot get channel for broker $broker")
correlationId = correlationId + 1
val reqInfo = Map(partition -> PartitionOffsetRequestInfo(defaultOffset, 1))
val request = OffsetRequest(reqInfo, 1, correlationId, kafkaClientId)
channel.get.send(request)
val response = OffsetResponse.readFrom(channel.get.receive.buffer)
channel.get.disconnect()
if (correlationId != response.correlationId)
throw new CorrelationMismatchException(
"OffsetRequest: " + correlationId + ", " +
"OffsetResponse: " + response.correlationId)
if (response.hasError) {
println(s"Trying to fetch offset request for time $defaultOffset failed")
return None
} else {
val offsetResponse = response.partitionErrorAndOffsets.get(partition).get
println(s"$partition fetched offset for time $defaultOffset is " + offsetResponse.offsets(0))
return Option(offsetResponse.offsets(0))
}
return None
}
def storeOffsetRanges(offsetRanges: Array[OffsetRange]): Unit = {
val channel = getChannel(seedBrokers)
if (!channel.isDefined)
throw new Exception("Cannot get channel from seed brokers")
val now = System.currentTimeMillis()
val offsets = mutable.Map[TopicAndPartition, OffsetAndMetadata]()
for (osr <- offsetRanges) {
val partition = TopicAndPartition(osr.topic, osr.partition)
val oam = OffsetAndMetadata(osr.untilOffset, "metadata?", now)
offsets += (partition -> oam)
println("[" + osr.topic + "-" + osr.partition + "] offset: "
+ osr.untilOffset + " added to commit request")
}
correlationId = correlationId + 1
val commitRequest = new OffsetCommitRequest(consumerGroupId, offsets.toMap, 1,
correlationId, kafkaClientId);
try {
channel.get.send(commitRequest)
val commitResponse = OffsetCommitResponse.readFrom(channel.get.receive.buffer)
channel.get.disconnect()
if (correlationId != commitResponse.correlationId)
throw new CorrelationMismatchException(
"OffsetCommitRequest: " + correlationId + ", " +
"OffsetCommitResponse: " + commitResponse.correlationId)
if (commitResponse.hasError) {
for ((partition, broker) <- partitions) {
val status = commitResponse.commitStatus.get(partition)
if (!status.isDefined) {
println("WARN: commit status was not defined for " + partition)
} else {
val errorCode = status.get
if (errorCode == ErrorMapping.NotCoordinatorForConsumerCode
|| errorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode) {
println("[" + partition.topic + "-" + partition.partition
+ "] Consumer coordinator has moved, need to retry")
Thread sleep 500
storeOffsetRanges(offsetRanges)
} else if (errorCode == ErrorMapping.OffsetMetadataTooLargeCode) {
throw new Exception("WARN: partition " + partition
+ ": You must reduce the size of the metadata if you wish to retry")
}
}
}
} else {
println("offsets and metadata stored without errors")
}
} catch {
case ioe: IOException => println("an IOException occred: " + ioe)
}
}
}
package com.blackberry.bdp.kafkafilter
import kafka.common.TopicAndPartition
import kafka.serializer.StringDecoder
import kafka.message.MessageAndMetadata
import org.apache.spark.{ SparkConf, TaskContext }
import org.apache.spark.SparkContext
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.kafka.{ OffsetRange, HasOffsetRanges }
import org.slf4j.{ Logger, LoggerFactory }
import scala.collection.mutable
import com.blackberry.bdp.korpse._
object Main {
val LOG: Logger = LoggerFactory.getLogger(this.getClass);
def main(args: Array[String]) {
if (args.length != 7) {
println("Required arguments:")
println("\t1 => metadata brokers")
println("\t2 => topic names (comma seperated)")
println("\t3 => batch length (seconds)")
println("\t4 => consumer group ID")
println("\t5 => Kafka client ID")
println("\t6 => default offset (-2 earliest, -1 latest)")
System.exit(1);
}
val seedBrokerList = args(0)
val topics = args(1)
val batchSeconds = args(2).toInt
val consumerGroup = args(3)
val kafkaClientId = args(4)
val defaultOffset = args(5)
val ssc = new StreamingContext(new SparkConf, Seconds(batchSeconds))
val kafkaParams = Map(
"metadata.broker.list" -> seedBrokerList)
val korpse = Korpse(seedBrokerList,
topics,
consumerGroup,
kafkaClientId,
defaultOffset.toLong)
val fromOffsets = korpse.getFromOffsets()
if (!fromOffsets.isDefined) {
LOG.error("Unable to determine starting offsets")
System.exit(1)
}
println("\n***\nfrom offsets: " + fromOffsets + "\n***\n")
fromOffsets.get.foreach((t2) => println(t2._1 + " starting offset: " + t2._2))
var offsetRanges = Array[OffsetRange]()
// Create a stream from our offset ranges that includes the Kafka message, partition and offset
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, Int, Long)](ssc, kafkaParams, fromOffsets.get,
(mmd: MessageAndMetadata[String, String]) => (mmd.message(), mmd.partition, mmd.offset))
.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
// Typical application specific Spark stream processing now follows
val results = stream
.filter() // you can filter the stream
.map() // ...and apply a mapping function
.reduceByKey() // then reduce by key
.foreachRDD(rdd => {
rdd.foreach(println)
})
// Now we want to persist the offset ranges that we're consuming so we
// start from where we left off if/when the job is stoped/crashes and is
// restarted
stream.foreachRDD { rdd =>
korpse.storeOffsetRanges(offsetRanges)
}
ssc.start();
ssc.awaitTermination();
}
}
@ariens
Copy link
Author

ariens commented Nov 23, 2015

This is an example Spark job that uses the Korpse library to provide and store offsets in Kafka. The direct stream processing in Spark 1.3.1-1.5.1 doesn't use the standard high level consumer and pushes the responsibility of management to the application. Korpse simplifies this metadata persistence by using the Kafka blocking channel API to store/retrieve the offsets for all partitions in one or more topics.

The Korpse library and example Spark job is included in this gist.

@koeninger
Copy link

The direct stream has code for interacting with Kafka's offset api, it's just not public.

There's a JIRA for exposing it as a public api

https://issues.apache.org/jira/browse/SPARK-10963

you can see the PR if you want to apply that patch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment