Skip to content

Instantly share code, notes, and snippets.

@thijsc
Last active August 8, 2020 09:07
Show Gist options
  • Save thijsc/50bd69d4f00ae4176d2694b3f9c2a367 to your computer and use it in GitHub Desktop.
Save thijsc/50bd69d4f00ae4176d2694b3f9c2a367 to your computer and use it in GitHub Desktop.
Little wrapper script around Kafka command line tools to assign topics.
#!/usr/bin/env ruby
require "fileutils"
require "json"
ZOOKEEPER = raise "Put your zookeeper list here"
DEFAULT_BROKER_LIST = raise "put your broker list here (1,2,3)
FileUtils.rm_f "/tmp/topics.json"
FileUtils.rm_f "/tmp/assignment.json"
topic = ARGV[0]
raise "Supply a topic as an argument" unless topic
broker_list = if ARGV[1]
ARGV[1]
else
DEFAULT_BROKER_LIST
end
puts "Writing the input json for #{topic}"
topics = {
"topics" => [
{"topic" => topic}
],
"version" => 1
}
File.write("/tmp/topics.json", JSON.generate(topics))
puts "Generating assignment for brokers '#{broker_list}'"
puts "Is this brokers list correct? (yes/no):"
unless STDIN.gets.chomp == 'yes'
puts "You can provide the broker list as the second argument, exiting"
exit
end
assignment_output = `/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper=#{ZOOKEEPER} --topics-to-move-json-file=/tmp/topics.json --broker-list=#{broker_list} --generate`
current_assignment_json = JSON.parse(assignment_output.split("\n")[1])
new_assignment_json = JSON.parse(assignment_output.split("\n").last)
current_partitions = current_assignment_json['partitions']
new_partitions = new_assignment_json['partitions']
diff = []
current_partitions.sort_by{ |p| p['partition'] }.each do |current_p|
new_p = new_partitions.select { |p| p['partition'] == current_p['partition'] }.first
if current_p['replicas'].sort != new_p['replicas'].sort
diff << "#{current_p['partition']} changes from #{current_p['replicas'].sort} to #{new_p['replicas'].sort}"
end
end
if diff.length == 0
puts "No difference between assignments, exiting"
exit
else
puts "Diff between assignments:"
puts diff
end
puts "Please confirm (yes/no):"
unless STDIN.gets.chomp == 'yes'
puts "Exiting"
exit
end
File.write("/tmp/assignment.json", JSON.generate(new_assignment_json))
puts "Executing assignment"
system "/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper=#{ZOOKEEPER} --reassignment-json-file=/tmp/assignment.json --execute"
loop do
system "/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper=#{ZOOKEEPER} --reassignment-json-file=/tmp/assignment.json --verify"
sleep 1
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment