Last active
August 29, 2015 14:09
-
-
Save jjkoshy/a3f64d67fe494da3c3a6 to your computer and use it in GitHub Desktop.
Tool to fix offset manager movement caused by KAFKA-1469
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package kafka.tools | |
import joptsimple.OptionParser | |
import kafka.api._ | |
import kafka.cluster.Broker | |
import kafka.consumer.SimpleConsumer | |
import kafka.consumer.ConsumerConfig | |
import kafka.network.BlockingChannel | |
import kafka.utils.{Logging, CommandLineUtils} | |
import kafka.client.ClientUtils | |
import kafka.server.OffsetManager | |
import java.util.concurrent.atomic.AtomicInteger | |
import java.util.Properties | |
import org.apache.kafka.clients.producer._ | |
import scala.Some | |
import kafka.common.TopicAndPartition | |
import kafka.api.PartitionOffsetRequestInfo | |
import kafka.server.GroupTopicPartition | |
object DeleteInvalidOffsets { | |
val ClientId = "KAFKA_ADMIN" | |
def main(args: Array[String]) { | |
val parser = new OptionParser() | |
val brokerListOpt = parser.accepts("broker-list", "Kafka cluster broker-list or VIP"). | |
withRequiredArg().ofType(classOf[String]) | |
val partitionOpt = parser.accepts("partition", "Specific partition of the offsets topic to consume from"). | |
withRequiredArg().ofType(classOf[java.lang.Integer]) | |
val socketTimeoutOpt = parser.accepts("socket-timeout-ms", "Socket timeout to use for all requests"). | |
withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(10000) | |
val padCountOpt = parser.accepts("pad-count", "Number of padding messages to trigger log compaction"). | |
withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(5000000) | |
parser.accepts("dry-run", "Only find invalid offsets and report") | |
parser.accepts("help", "Print this message.") | |
val options = parser.parse(args : _*) | |
if (options.has("help")) { | |
parser.printHelpOn(System.out) | |
System.exit(0) | |
} | |
CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt) | |
val metadataBrokerList = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) | |
val partitionIdOpt = if (options.has(partitionOpt)) Some(options.valueOf(partitionOpt).toInt) else None | |
val offsetCleaner = new OffsetCleaner(metadataBrokerList, | |
partitionIdOpt, | |
options.valueOf(socketTimeoutOpt), | |
options.valueOf(padCountOpt), | |
options.has("dry-run")) | |
offsetCleaner.clean() | |
} | |
} | |
class OffsetCleaner(metadataBrokerList: Seq[Broker], | |
givenPartitionIdOpt: Option[Int], | |
socketTimeoutMs: Int, | |
padCount: Int, | |
dryRun: Boolean) extends Logging { | |
private val correlationId = new AtomicInteger(0) | |
private val FetchSize = 1024*1024 | |
private def oldAbs(n: Int) = n & 0x7fffffff | |
private def newAbs(n: Int) = if(n == Integer.MIN_VALUE) 0 else math.abs(n) | |
private def findInvalidOffsetEntries(partitionId: Int, broker: Broker, partitionCount: Int) = { | |
info("Looking for invalid offsets in partition " + partitionId) | |
val simpleConsumer = new SimpleConsumer(broker.host, broker.port, socketTimeoutMs, | |
BlockingChannel.UseDefaultBufferSize, DeleteInvalidOffsets.ClientId) | |
val invalidEntries = collection.mutable.Set[GroupTopicPartition]() | |
try { | |
val earliestOffsetRequest = new OffsetRequest( | |
Map(TopicAndPartition(OffsetManager.OffsetsTopicName, partitionId) -> PartitionOffsetRequestInfo(-2L, 1)), | |
correlationId.getAndIncrement(), Request.OrdinaryConsumerId) | |
val earliestOffsetResponse = simpleConsumer.getOffsetsBefore(earliestOffsetRequest) | |
val earliestOffset = earliestOffsetResponse.partitionErrorAndOffsets.head._2.offsets.head | |
debug("Earliest offset for partition %d is %d".format(partitionId, earliestOffset)) | |
var fetchOffset = earliestOffset | |
var validBytes = 0 | |
val fetchRequestBuilder = new FetchRequestBuilder() | |
.clientId(DeleteInvalidOffsets.ClientId) | |
.replicaId(Request.OrdinaryConsumerId) | |
.maxWait(5000) | |
.minBytes(ConsumerConfig.MinFetchBytes) | |
var totalEntries = 0 | |
do { | |
val fetchRequest = fetchRequestBuilder.addFetch(OffsetManager.OffsetsTopicName, partitionId, fetchOffset, FetchSize).build() | |
val fetchResponse = simpleConsumer.fetch(fetchRequest) | |
val messageSet = fetchResponse.messageSet(OffsetManager.OffsetsTopicName, partitionId) | |
validBytes = messageSet.validBytes | |
if (validBytes > 0) { | |
messageSet.foreach(messageAndOffset => { | |
require(messageAndOffset.message.hasKey, "Found a message without a key") | |
totalEntries += 1 | |
val key = OffsetManager.readMessageKey(messageAndOffset.message.key) | |
val theHash = key.group.hashCode | |
val oldPartitionForKey = oldAbs(theHash) % partitionCount | |
val newPartitionForKey = newAbs(theHash) % partitionCount | |
if (oldPartitionForKey == partitionId && // the old hash was used to assign the offset manager | |
oldPartitionForKey != newPartitionForKey && // the old hash does not happen to be the same as the new (correct) hash | |
messageAndOffset.message.payload != null) { // this should not be a tombstone entry | |
invalidEntries.add(key) | |
} | |
fetchOffset = messageAndOffset.nextOffset | |
}) | |
} | |
} while (validBytes > 0) | |
info("Found %d invalid entries out of %d total offset entries.".format(invalidEntries.size, totalEntries)) | |
debug("Full list: " + invalidEntries) | |
} | |
finally { | |
simpleConsumer.close() | |
} | |
invalidEntries | |
} | |
private def deleteInvalidOffsetEntries(invalidEntries: Seq[GroupTopicPartition], partitionId: Int, producer: Producer) { | |
info("Sending tombstones.") | |
invalidEntries.foreach(groupTopicPartition => { | |
val key = OffsetManager.offsetCommitKey(groupTopicPartition.group, | |
groupTopicPartition.topicPartition.topic, | |
groupTopicPartition.topicPartition.partition) | |
val record = new ProducerRecord(OffsetManager.OffsetsTopicName, partitionId, key, null /* tombstone payload */) | |
producer.send(record) | |
}) | |
} | |
private def padupLog(partitionId: Int, padGroup: String, producer: Producer) { | |
info("Padding up log") | |
(1 to padCount).foreach(pad => { | |
val key = OffsetManager.offsetCommitKey(group = padGroup, topic = padGroup, partition = 0) | |
val record = new ProducerRecord(OffsetManager.OffsetsTopicName, partitionId, key, null) | |
producer.send(record) | |
}) | |
} | |
def clean() { | |
// 1) get metadata for __consumer_offsets | |
val metadataResponse = | |
ClientUtils.fetchTopicMetadata(Set(OffsetManager.OffsetsTopicName), metadataBrokerList, clientId = "KAFKA_ADMIN", timeoutMs = 5000) | |
val consumerOffsetsMetadataOpt = metadataResponse.topicsMetadata.headOption | |
require(consumerOffsetsMetadataOpt.isDefined, "No metadata available for " + OffsetManager.OffsetsTopicName) | |
debug("Received metadata for %s: %s".format(OffsetManager.OffsetsTopicName, consumerOffsetsMetadataOpt.get)) | |
val partitionsMetadata = consumerOffsetsMetadataOpt.get.partitionsMetadata | |
debug("%d offsets partitions".format(partitionsMetadata.size)) | |
var numInvalidOffsets = 0 | |
// 2) For each partition of the offsets topic | |
partitionsMetadata.foreach(partitionMetadata => { | |
val thisPartition = partitionMetadata.partitionId | |
if (givenPartitionIdOpt.map(givenPartitionId => givenPartitionId == thisPartition).getOrElse(true)) { | |
require(partitionMetadata.leader.isDefined, "Leader for partition %d unavailable.".format(thisPartition)) | |
// 2a) Find invalid offset entries | |
val invalidEntries = findInvalidOffsetEntries(thisPartition, partitionMetadata.leader.get, partitionsMetadata.size) | |
numInvalidOffsets += invalidEntries.size | |
if (invalidEntries.nonEmpty && !dryRun) { | |
val producerProps = new Properties() | |
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, partitionMetadata.leader.get.connectionString) | |
producerProps.put(ProducerConfig.ACKS_CONFIG, "-1") | |
producerProps.put(ProducerConfig.RETRIES_CONFIG, "10") | |
val producer = new KafkaProducer(producerProps) | |
try { | |
// 2b) Write tombstones for those offset entries | |
deleteInvalidOffsetEntries(invalidEntries.toSeq, thisPartition, producer) | |
// 2c) Fill the log to try and force compaction to run. (First find a valid "dummy" key to use.) | |
val padGroupPrefix = "__KAFKA_ADMIN_" | |
var padGroupSuffix = 0L | |
while (newAbs((padGroupPrefix + padGroupSuffix).hashCode) % partitionsMetadata.size != thisPartition) | |
padGroupSuffix += 1 | |
val padGroup = "%s%d".format(padGroupPrefix, padGroupSuffix) | |
padupLog(thisPartition, padGroup, producer) | |
} | |
finally { | |
producer.close() | |
} | |
} | |
} | |
}) | |
info("Found %d invalid offsets in total.".format(numInvalidOffsets)) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment