Skip to content

Instantly share code, notes, and snippets.

@nunosilva800
Created January 14, 2022 15:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nunosilva800/f3b8caa428683cf36e37ef8e9776adaa to your computer and use it in GitHub Desktop.
Save nunosilva800/f3b8caa428683cf36e37ef8e9776adaa to your computer and use it in GitHub Desktop.
Plan Kafka cluster expansion by increased replication factor. Assumes new broker ids are shifted by 5.
require 'optparse'
require 'json'
options = {}
op = OptionParser.new do |opts|
opts.banner = "Usage: example.rb [options]"
opts.on("-c", "--context REQUIRED", String, "k8s context") do |v|
options[:context] = v
end
opts.on("-n", "--namespace REQUIRED", String, "k8s namepace") do |v|
options[:namespace] = v
end
opts.on("-t", "--topics REQUIRED", Array, "kafka topics") do |v|
options[:topics] = v
end
opts.on("-o", "--output REQUIRED", String, "output file") do |v|
options[:output] = v
end
end
op.parse!
if options[:context].nil? || options[:namespace].nil? || options[:topics].nil?
puts op.help
exit 1
end
file = {
version: 1,
partitions: []
}
# Parse this output to extract partition number and replicas:
#
# "Picked up JAVA_TOOL_OPTIONS: -XX:+UseContainerSupport
# Topic:orders\tPartitionCount:15\tReplicationFactor:3\tConfigs:
# \tTopic: orders\tPartition: 0\tLeader: 9\tReplicas: 9,6,8\tIsr: 6,8,9
# \tTopic: orders\tPartition: 1\tLeader: 6\tReplicas: 6,8,5\tIsr: 5,6,8
# \tTopic: orders\tPartition: 2\tLeader: 8\tReplicas: 8,5,7\tIsr: 7,5,8
# \tTopic: orders\tPartition: 3\tLeader: 5\tReplicas: 5,7,9\tIsr: 7,5,9
# \tTopic: orders\tPartition: 4\tLeader: 7\tReplicas: 7,9,6\tIsr: 9,6,7
# \tTopic: orders\tPartition: 5\tLeader: 9\tReplicas: 9,8,5\tIsr: 5,9,8
# \tTopic: orders\tPartition: 6\tLeader: 6\tReplicas: 6,5,7\tIsr: 7,5,6
# \tTopic: orders\tPartition: 7\tLeader: 8\tReplicas: 8,7,9\tIsr: 7,8,9
# \tTopic: orders\tPartition: 8\tLeader: 5\tReplicas: 5,9,6\tIsr: 5,6,9
# \tTopic: orders\tPartition: 9\tLeader: 7\tReplicas: 7,6,8\tIsr: 8,7,6
# \tTopic: orders\tPartition: 10\tLeader: 9\tReplicas: 9,5,7\tIsr: 7,5,9
# \tTopic: orders\tPartition: 11\tLeader: 6\tReplicas: 6,7,9\tIsr: 7,6,9
# \tTopic: orders\tPartition: 12\tLeader: 8\tReplicas: 8,9,6\tIsr: 6,8,9
# \tTopic: orders\tPartition: 13\tLeader: 5\tReplicas: 5,6,8\tIsr: 5,8,6
# \tTopic: orders\tPartition: 14\tLeader: 7\tReplicas: 7,8,5\tIsr: 8,5,7
#
options[:topics].each do |topic|
cmd_describe_topic = "kubectl --context #{options[:context]} --namespace #{options[:namespace]} " +
"exec -ti kafka-0 -c broker -- /opt/kafka/bin/kafka-topics.sh --zookeeper=zetcd:2181/kafka " +
"--describe --topic #{topic}"
res = %x[#{cmd_describe_topic}]
res.split("\n").each do |line|
md_partition = /Partition: \d*/.match(line.strip)
md_replicas = /Replicas: [\d*,]{5}/.match(line.strip)
if md_replicas
current_replicas = md_replicas.to_s.split(' ').last.split(',').map(&:to_i)
new_replicas = current_replicas.map { |p| (p + 5) }
file[:partitions] << {
topic: topic,
partition: md_partition.to_s.split(' ').last.to_i,
replicas: current_replicas + new_replicas,
}
end
end
end
# Write a nice JSON to feed into kafka-reassign-partitions.sh --reassignment-json-file
#
# {
# "version": 1,
# "partitions": [
# {
# "topic": "__consumer_offsets",
# "partition": 0,
# "replicas": [1,3,4,6,8,9]
# },
# ...
# ]
# }
File.open(options[:output], 'w') do |f|
f << JSON.pretty_generate(file)
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment