Skip to content

Instantly share code, notes, and snippets.

@lijonnas
Forked from ariens/Korpse.scala
Created February 22, 2017 04:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lijonnas/b8f0bf557ef3d835d31f923ca8a3c30e to your computer and use it in GitHub Desktop.
Save lijonnas/b8f0bf557ef3d835d31f923ca8a3c30e to your computer and use it in GitHub Desktop.
Kafka Offset Range Provider/Storage Engine
package com.blackberry.bdp.korpse
import scala.collection.parallel.ParIterable
import scala.collection.mutable
import scala.collection.mutable.{ Map => MutableMap }
import org.apache.spark.streaming.kafka.OffsetRange
import kafka.api._
import kafka.api.OffsetResponse
import kafka.api.PartitionOffsetsResponse
import kafka.cluster.Broker
import kafka.network.BlockingChannel
import kafka.common.ErrorMapping
import kafka.common.{ TopicAndPartition, OffsetAndMetadata }
import org.slf4j.{ Logger, LoggerFactory }
import java.io.IOException
case class HostAndPort(host: String, port: Int)
case class AllBrokersFailed(str: String) extends Exception(str)
case class CorrelationMismatchException(str: String) extends Exception(str)
object Korpse {
def apply(seedBrokersStr: String,
topicsStr: String,
consumerGroupId: String,
kafkaClientId: String,
defaultOffset: Long = -1) = {
new Korpse(parseBrokerString(seedBrokersStr),
parseTopicsString(topicsStr),
consumerGroupId,
kafkaClientId,
defaultOffset)
}
def parseTopicsString(topicsStr: String): Seq[String] = {
topicsStr.split("""\s?,\s?""").map(_.trim).toSeq
}
def parseBrokerString(seedBrokerStr: String): Seq[HostAndPort] = {
val seedBrokerBuffer = new mutable.ListBuffer[HostAndPort]()
for (seedBroker <- seedBrokerStr
.split("""\s?,\s?""")
.map(_.trim)) {
val host :: port :: xs = seedBroker
.split("""\s?:\s?""").map(_.trim).toList
seedBrokerBuffer += HostAndPort(host, port.toInt)
}
seedBrokerBuffer.toSeq
}
}
class Korpse(
seedBrokers: Seq[HostAndPort],
topics: Seq[String],
consumerGroupId: String,
kafkaClientId: String,
defaultOffset: Long) {
val LOG: Logger = LoggerFactory.getLogger(this.getClass);
var correlationId = 0
val backOffMs = 500
val retries = 3
val backoffExponent = 2
val partitions = getPartitions()
var partitionDefaultStarts: Option[Map[TopicAndPartition, Long]] = None
var topicDefaultStarts: Option[Map[String, Long]] = None
var globalDefaultStarts: Long = -1
// Gets a channel to a specific broker
private def getChannel(broker: Broker): Option[BlockingChannel] = {
return getChannel(HostAndPort(broker.host, broker.port), true)
}
// Gets a channel from a list of seed brokers
private def getChannel(seedBrokers: Seq[HostAndPort]): Option[BlockingChannel] = {
for (seedBroker <- seedBrokers) {
try {
return getChannel(new HostAndPort(seedBroker.host, seedBroker.port))
} catch {
case ioe: IOException => println(s"Blocking Channel "
+ "IOException on $host:$port")
}
}
return None
}
// Gets a channel from a host and port
private def getChannel(host: HostAndPort,
ignoreCoordinatorRedirect: Boolean = false):
Option[BlockingChannel] = {
try {
println(s"Establishing channel to $host")
var channel = new BlockingChannel(host.host,
host.port,
BlockingChannel.UseDefaultBufferSize,
BlockingChannel.UseDefaultBufferSize,
5000);
correlationId = correlationId + 1
channel.connect
channel.send(ConsumerMetadataRequest(consumerGroupId,
ConsumerMetadataRequest.CurrentVersion,
correlationId,
kafkaClientId));
val metadataResponse = ConsumerMetadataResponse.readFrom(
channel.receive.buffer);
if (correlationId != metadataResponse.correlationId)
throw new CorrelationMismatchException(
"ConsumerMetadataRequest: " + correlationId + ", " +
"ConsumerMetadataResponse: " + metadataResponse.correlationId)
if (metadataResponse.errorCode == ErrorMapping.NoError) {
if (ignoreCoordinatorRedirect) {
println("Connected to " + channel.host + ":" + channel.port
+ " and not redirecting to my coordinator for offset mgmt")
return Option(channel)
}
val offsetManager = metadataResponse.coordinatorOpt
if (offsetManager.isDefined) {
channel.disconnect
channel = new BlockingChannel(offsetManager.get.host,
offsetManager.get.port,
BlockingChannel.UseDefaultBufferSize,
BlockingChannel.UseDefaultBufferSize,
5000);
channel.connect
return Option(channel)
}
} else {
println("Error code: " + metadataResponse.errorCode)
return None
}
} catch {
case ioe: IOException =>
println(s"Blocking Channel "
+ "IOException on $host:$port")
return None
}
return None
}
/**
* This mapping of TopicAndPartition to Broker is used as
* subsequent OffsetRequest's to fetch latest/earliest
* offset must be sent to the partiton leader only
*/
private def getPartitions(): Map[TopicAndPartition, Broker] = {
val tempMap = MutableMap[TopicAndPartition, Broker]()
correlationId = correlationId + 1
val channel = getChannel(seedBrokers)
if (!channel.isDefined)
throw new Exception("Cannot get channel from seed brokers")
channel.get.send(new TopicMetadataRequest(topics, correlationId))
val response = TopicMetadataResponse.readFrom(channel.get.receive.buffer)
channel.get.disconnect()
if (correlationId != response.correlationId)
throw new CorrelationMismatchException(
"TopicMetadataRequest: " + correlationId + ", " +
"TopicMetadataResponse: " + response.correlationId)
for (topicMetadata <- response.topicsMetadata) {
for (partitionMetadata <- topicMetadata.partitionsMetadata) {
if (!partitionMetadata.leader.isDefined) {
println("ERROR: leader not defined for " + partitionMetadata)
} else {
println("discovered partition: " + partitionMetadata)
val partition = TopicAndPartition(topicMetadata.topic,
partitionMetadata.partitionId)
tempMap += (partition -> partitionMetadata.leader.get)
}
}
}
return tempMap.toMap
}
/**
* See code references 1 and 2, there's something weird about
* the OffsetRequest, I would have suspected that a None would
* have been present for the optional instead of always getting
* back a -1. There's the default Kafka parameter named
* auto.offset.reset that defaults to largest (-1) which doesn't
* seem to be relevant for using a blocking channel to make API
* calls directly. Needs more investigation as this workaround
* seems hackish -- dariens
*/
def getFromOffsets(): Option[Map[TopicAndPartition, Long]] = {
var fromOffsets = MutableMap[TopicAndPartition, Long]()
var unknowns = MutableMap[TopicAndPartition, Broker]()
val channel = getChannel(seedBrokers)
if (!channel.isDefined)
throw new Exception("Cannot get channel from seed brokers")
correlationId = correlationId + 1
val fetchRequest = new OffsetFetchRequest(consumerGroupId,
partitions.keySet.toSeq, 1,
correlationId,
kafkaClientId);
channel.get.send(fetchRequest)
val fetchResponse = OffsetFetchResponse.readFrom(channel.get.receive.buffer)
channel.get.disconnect()
if (correlationId != fetchResponse.correlationId)
throw new CorrelationMismatchException(
"OffsetFetchRequest: " + correlationId + ", " +
"OffsetFetchResponse: " + fetchResponse.correlationId)
for ((partition, broker) <- partitions) {
val metadataAndError = fetchResponse.requestInfo.get(partition)
if (metadataAndError.isDefined) {
val errorCode = metadataAndError.get.error
if (errorCode == ErrorMapping.NotCoordinatorForConsumerCode) {
println("channel created on " + channel.get.host + ":" + channel.get.port
+ " is not coordinator for group " + consumerGroupId
+ " and client ID " + kafkaClientId + ", retrying")
Thread sleep backOffMs
return getFromOffsets
} else if (errorCode == ErrorMapping.OffsetsLoadInProgressCode) {
LOG.warn("offset load in processs for " + partition.topic
+ "-" + partition.partition + ", retrying")
Thread sleep backOffMs
return getFromOffsets
} else {
if (metadataAndError.get.offset == -1) {
println(s"$partition received -1 OffsetFetchResponse "
+ s"fetching default for time $defaultOffset")
// Code reference [1] -- why do we need to do this?!
val offset = getDefaultFromOffset(partition, broker)
if (offset.isDefined) {
println(s"$partition default offset for $defaultOffset was " + offset.get)
fromOffsets += (partition -> offset.get)
} else {
throw new Exception(s"couldn't find offset for partition: $partition")
}
} else {
fromOffsets += (partition -> metadataAndError.get.offset)
println("mapping from offset for partion " + partition
+ " to " + metadataAndError.get.offset)
}
}
} else {
// Code reference [2] -- Does this ever execute--since it appears we get a -1 when
// There are no offsets committed
println(partition + ": Committed offset not found, fetching default")
val offset = getDefaultFromOffset(partition, broker)
if (offset.isDefined) {
println(s"$partition default offset for $defaultOffset was " + offset.get)
fromOffsets += (partition -> offset.get)
} else {
throw new Exception("couldn't find any from offset for partition: " + partition)
}
}
}
return Option(fromOffsets.toMap)
}
def getDefaultFromOffset(partition: TopicAndPartition,
broker: Broker): Option[Long] = {
val channel = getChannel(broker)
if (!channel.isDefined)
throw new Exception(s"Cannot get channel for broker $broker")
correlationId = correlationId + 1
val reqInfo = Map(partition -> PartitionOffsetRequestInfo(defaultOffset, 1))
val request = OffsetRequest(reqInfo, 1, correlationId, kafkaClientId)
channel.get.send(request)
val response = OffsetResponse.readFrom(channel.get.receive.buffer)
channel.get.disconnect()
if (correlationId != response.correlationId)
throw new CorrelationMismatchException(
"OffsetRequest: " + correlationId + ", " +
"OffsetResponse: " + response.correlationId)
if (response.hasError) {
println(s"Trying to fetch offset request for time $defaultOffset failed")
return None
} else {
val offsetResponse = response.partitionErrorAndOffsets.get(partition).get
println(s"$partition fetched offset for time $defaultOffset is " + offsetResponse.offsets(0))
return Option(offsetResponse.offsets(0))
}
return None
}
def storeOffsetRanges(offsetRanges: Array[OffsetRange]): Unit = {
val channel = getChannel(seedBrokers)
if (!channel.isDefined)
throw new Exception("Cannot get channel from seed brokers")
val now = System.currentTimeMillis()
val offsets = mutable.Map[TopicAndPartition, OffsetAndMetadata]()
for (osr <- offsetRanges) {
val partition = TopicAndPartition(osr.topic, osr.partition)
val oam = OffsetAndMetadata(osr.untilOffset, "metadata?", now)
offsets += (partition -> oam)
println("[" + osr.topic + "-" + osr.partition + "] offset: "
+ osr.untilOffset + " added to commit request")
}
correlationId = correlationId + 1
val commitRequest = new OffsetCommitRequest(consumerGroupId, offsets.toMap, 1,
correlationId, kafkaClientId);
try {
channel.get.send(commitRequest)
val commitResponse = OffsetCommitResponse.readFrom(channel.get.receive.buffer)
channel.get.disconnect()
if (correlationId != commitResponse.correlationId)
throw new CorrelationMismatchException(
"OffsetCommitRequest: " + correlationId + ", " +
"OffsetCommitResponse: " + commitResponse.correlationId)
if (commitResponse.hasError) {
for ((partition, broker) <- partitions) {
val status = commitResponse.commitStatus.get(partition)
if (!status.isDefined) {
println("WARN: commit status was not defined for " + partition)
} else {
val errorCode = status.get
if (errorCode == ErrorMapping.NotCoordinatorForConsumerCode
|| errorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode) {
println("[" + partition.topic + "-" + partition.partition
+ "] Consumer coordinator has moved, need to retry")
Thread sleep 500
storeOffsetRanges(offsetRanges)
} else if (errorCode == ErrorMapping.OffsetMetadataTooLargeCode) {
throw new Exception("WARN: partition " + partition
+ ": You must reduce the size of the metadata if you wish to retry")
}
}
}
} else {
println("offsets and metadata stored without errors")
}
} catch {
case ioe: IOException => println("an IOException occred: " + ioe)
}
}
}
package com.blackberry.bdp.kafkafilter
import kafka.common.TopicAndPartition
import kafka.serializer.StringDecoder
import kafka.message.MessageAndMetadata
import org.apache.spark.{ SparkConf, TaskContext }
import org.apache.spark.SparkContext
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.kafka.{ OffsetRange, HasOffsetRanges }
import org.slf4j.{ Logger, LoggerFactory }
import scala.collection.mutable
import com.blackberry.bdp.korpse._
object Main {
val LOG: Logger = LoggerFactory.getLogger(this.getClass);
def main(args: Array[String]) {
if (args.length != 7) {
println("Required arguments:")
println("\t1 => metadata brokers")
println("\t2 => topic names (comma seperated)")
println("\t3 => batch length (seconds)")
println("\t4 => consumer group ID")
println("\t5 => Kafka client ID")
println("\t6 => default offset (-2 earliest, -1 latest)")
System.exit(1);
}
val seedBrokerList = args(0)
val topics = args(1)
val batchSeconds = args(2).toInt
val consumerGroup = args(3)
val kafkaClientId = args(4)
val defaultOffset = args(5)
val ssc = new StreamingContext(new SparkConf, Seconds(batchSeconds))
val kafkaParams = Map(
"metadata.broker.list" -> seedBrokerList)
val korpse = Korpse(seedBrokerList,
topics,
consumerGroup,
kafkaClientId,
defaultOffset.toLong)
val fromOffsets = korpse.getFromOffsets()
if (!fromOffsets.isDefined) {
LOG.error("Unable to determine starting offsets")
System.exit(1)
}
println("\n***\nfrom offsets: " + fromOffsets + "\n***\n")
fromOffsets.get.foreach((t2) => println(t2._1 + " starting offset: " + t2._2))
var offsetRanges = Array[OffsetRange]()
// Create a stream from our offset ranges that includes the Kafka message, partition and offset
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, Int, Long)](ssc, kafkaParams, fromOffsets.get,
(mmd: MessageAndMetadata[String, String]) => (mmd.message(), mmd.partition, mmd.offset))
.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
// Typical application specific Spark stream processing now follows
val results = stream
.filter() // you can filter the stream
.map() // ...and apply a mapping function
.reduceByKey() // then reduce by key
.foreachRDD(rdd => {
rdd.foreach(println)
})
// Now we want to persist the offset ranges that we're consuming so we
// start from where we left off if/when the job is stoped/crashes and is
// restarted
stream.foreachRDD { rdd =>
korpse.storeOffsetRanges(offsetRanges)
}
ssc.start();
ssc.awaitTermination();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment