Skip to content

Instantly share code, notes, and snippets.

@jjkoshy
Last active August 29, 2015 14:09
Show Gist options
  • Save jjkoshy/a3f64d67fe494da3c3a6 to your computer and use it in GitHub Desktop.
Save jjkoshy/a3f64d67fe494da3c3a6 to your computer and use it in GitHub Desktop.
Tool to fix offset manager movement caused by KAFKA-1469
package kafka.tools
import joptsimple.OptionParser
import kafka.api._
import kafka.cluster.Broker
import kafka.consumer.SimpleConsumer
import kafka.consumer.ConsumerConfig
import kafka.network.BlockingChannel
import kafka.utils.{Logging, CommandLineUtils}
import kafka.client.ClientUtils
import kafka.server.OffsetManager
import java.util.concurrent.atomic.AtomicInteger
import java.util.Properties
import org.apache.kafka.clients.producer._
import scala.Some
import kafka.common.TopicAndPartition
import kafka.api.PartitionOffsetRequestInfo
import kafka.server.GroupTopicPartition
object DeleteInvalidOffsets {
val ClientId = "KAFKA_ADMIN"
def main(args: Array[String]) {
val parser = new OptionParser()
val brokerListOpt = parser.accepts("broker-list", "Kafka cluster broker-list or VIP").
withRequiredArg().ofType(classOf[String])
val partitionOpt = parser.accepts("partition", "Specific partition of the offsets topic to consume from").
withRequiredArg().ofType(classOf[java.lang.Integer])
val socketTimeoutOpt = parser.accepts("socket-timeout-ms", "Socket timeout to use for all requests").
withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(10000)
val padCountOpt = parser.accepts("pad-count", "Number of padding messages to trigger log compaction").
withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(5000000)
parser.accepts("dry-run", "Only find invalid offsets and report")
parser.accepts("help", "Print this message.")
val options = parser.parse(args : _*)
if (options.has("help")) {
parser.printHelpOn(System.out)
System.exit(0)
}
CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt)
val metadataBrokerList = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
val partitionIdOpt = if (options.has(partitionOpt)) Some(options.valueOf(partitionOpt).toInt) else None
val offsetCleaner = new OffsetCleaner(metadataBrokerList,
partitionIdOpt,
options.valueOf(socketTimeoutOpt),
options.valueOf(padCountOpt),
options.has("dry-run"))
offsetCleaner.clean()
}
}
class OffsetCleaner(metadataBrokerList: Seq[Broker],
givenPartitionIdOpt: Option[Int],
socketTimeoutMs: Int,
padCount: Int,
dryRun: Boolean) extends Logging {
private val correlationId = new AtomicInteger(0)
private val FetchSize = 1024*1024
private def oldAbs(n: Int) = n & 0x7fffffff
private def newAbs(n: Int) = if(n == Integer.MIN_VALUE) 0 else math.abs(n)
private def findInvalidOffsetEntries(partitionId: Int, broker: Broker, partitionCount: Int) = {
info("Looking for invalid offsets in partition " + partitionId)
val simpleConsumer = new SimpleConsumer(broker.host, broker.port, socketTimeoutMs,
BlockingChannel.UseDefaultBufferSize, DeleteInvalidOffsets.ClientId)
val invalidEntries = collection.mutable.Set[GroupTopicPartition]()
try {
val earliestOffsetRequest = new OffsetRequest(
Map(TopicAndPartition(OffsetManager.OffsetsTopicName, partitionId) -> PartitionOffsetRequestInfo(-2L, 1)),
correlationId.getAndIncrement(), Request.OrdinaryConsumerId)
val earliestOffsetResponse = simpleConsumer.getOffsetsBefore(earliestOffsetRequest)
val earliestOffset = earliestOffsetResponse.partitionErrorAndOffsets.head._2.offsets.head
debug("Earliest offset for partition %d is %d".format(partitionId, earliestOffset))
var fetchOffset = earliestOffset
var validBytes = 0
val fetchRequestBuilder = new FetchRequestBuilder()
.clientId(DeleteInvalidOffsets.ClientId)
.replicaId(Request.OrdinaryConsumerId)
.maxWait(5000)
.minBytes(ConsumerConfig.MinFetchBytes)
var totalEntries = 0
do {
val fetchRequest = fetchRequestBuilder.addFetch(OffsetManager.OffsetsTopicName, partitionId, fetchOffset, FetchSize).build()
val fetchResponse = simpleConsumer.fetch(fetchRequest)
val messageSet = fetchResponse.messageSet(OffsetManager.OffsetsTopicName, partitionId)
validBytes = messageSet.validBytes
if (validBytes > 0) {
messageSet.foreach(messageAndOffset => {
require(messageAndOffset.message.hasKey, "Found a message without a key")
totalEntries += 1
val key = OffsetManager.readMessageKey(messageAndOffset.message.key)
val theHash = key.group.hashCode
val oldPartitionForKey = oldAbs(theHash) % partitionCount
val newPartitionForKey = newAbs(theHash) % partitionCount
if (oldPartitionForKey == partitionId && // the old hash was used to assign the offset manager
oldPartitionForKey != newPartitionForKey && // the old hash does not happen to be the same as the new (correct) hash
messageAndOffset.message.payload != null) { // this should not be a tombstone entry
invalidEntries.add(key)
}
fetchOffset = messageAndOffset.nextOffset
})
}
} while (validBytes > 0)
info("Found %d invalid entries out of %d total offset entries.".format(invalidEntries.size, totalEntries))
debug("Full list: " + invalidEntries)
}
finally {
simpleConsumer.close()
}
invalidEntries
}
private def deleteInvalidOffsetEntries(invalidEntries: Seq[GroupTopicPartition], partitionId: Int, producer: Producer) {
info("Sending tombstones.")
invalidEntries.foreach(groupTopicPartition => {
val key = OffsetManager.offsetCommitKey(groupTopicPartition.group,
groupTopicPartition.topicPartition.topic,
groupTopicPartition.topicPartition.partition)
val record = new ProducerRecord(OffsetManager.OffsetsTopicName, partitionId, key, null /* tombstone payload */)
producer.send(record)
})
}
private def padupLog(partitionId: Int, padGroup: String, producer: Producer) {
info("Padding up log")
(1 to padCount).foreach(pad => {
val key = OffsetManager.offsetCommitKey(group = padGroup, topic = padGroup, partition = 0)
val record = new ProducerRecord(OffsetManager.OffsetsTopicName, partitionId, key, null)
producer.send(record)
})
}
def clean() {
// 1) get metadata for __consumer_offsets
val metadataResponse =
ClientUtils.fetchTopicMetadata(Set(OffsetManager.OffsetsTopicName), metadataBrokerList, clientId = "KAFKA_ADMIN", timeoutMs = 5000)
val consumerOffsetsMetadataOpt = metadataResponse.topicsMetadata.headOption
require(consumerOffsetsMetadataOpt.isDefined, "No metadata available for " + OffsetManager.OffsetsTopicName)
debug("Received metadata for %s: %s".format(OffsetManager.OffsetsTopicName, consumerOffsetsMetadataOpt.get))
val partitionsMetadata = consumerOffsetsMetadataOpt.get.partitionsMetadata
debug("%d offsets partitions".format(partitionsMetadata.size))
var numInvalidOffsets = 0
// 2) For each partition of the offsets topic
partitionsMetadata.foreach(partitionMetadata => {
val thisPartition = partitionMetadata.partitionId
if (givenPartitionIdOpt.map(givenPartitionId => givenPartitionId == thisPartition).getOrElse(true)) {
require(partitionMetadata.leader.isDefined, "Leader for partition %d unavailable.".format(thisPartition))
// 2a) Find invalid offset entries
val invalidEntries = findInvalidOffsetEntries(thisPartition, partitionMetadata.leader.get, partitionsMetadata.size)
numInvalidOffsets += invalidEntries.size
if (invalidEntries.nonEmpty && !dryRun) {
val producerProps = new Properties()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, partitionMetadata.leader.get.connectionString)
producerProps.put(ProducerConfig.ACKS_CONFIG, "-1")
producerProps.put(ProducerConfig.RETRIES_CONFIG, "10")
val producer = new KafkaProducer(producerProps)
try {
// 2b) Write tombstones for those offset entries
deleteInvalidOffsetEntries(invalidEntries.toSeq, thisPartition, producer)
// 2c) Fill the log to try and force compaction to run. (First find a valid "dummy" key to use.)
val padGroupPrefix = "__KAFKA_ADMIN_"
var padGroupSuffix = 0L
while (newAbs((padGroupPrefix + padGroupSuffix).hashCode) % partitionsMetadata.size != thisPartition)
padGroupSuffix += 1
val padGroup = "%s%d".format(padGroupPrefix, padGroupSuffix)
padupLog(thisPartition, padGroup, producer)
}
finally {
producer.close()
}
}
}
})
info("Found %d invalid offsets in total.".format(numInvalidOffsets))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment