Skip to content

Instantly share code, notes, and snippets.

@hivefans
Forked from killme2008/sync_offsets.rb
Last active March 17, 2020 02:03
Show Gist options
  • Save hivefans/2cfb76bd17a4337d94f9 to your computer and use it in GitHub Desktop.
Save hivefans/2cfb76bd17a4337d94f9 to your computer and use it in GitHub Desktop.
|-|{"files":{"sync_offsets.rb":{"env":"plain"}},"tag":"bigdata"}
####
# Description:a ruby script to sync consumers offsets with brokers offsets.
# Requirements: zookeeper
# sudo gem install zookeeper
#
#####
require 'rubygems'
require 'zookeeper'
require 'socket'
class Partition
attr_accessor :broker_id,:partition
def initialize(broker_id, partition)
@broker_id = broker_id
@partition = partition
end
def to_s
return "#{@broker_id}-#{@partition}"
end
def inspect
return to_s
end
def <=>(other)
if @broker_id != other.broker_id
return @broker_id <=> other.broker_id
else
return @partition <=> other.partition
end
end
end
class MetaQOffsetsSyncer
def initialize(zk_servers, metaq_server="localhost:8123", zk_root="/avos-fetch-meta", topic="avos-fetch-tasks", group="avos-fetcher")
@zk = Zookeeper.new(zk_servers)
@metaq_server = metaq_server
@topic = topic
@zk_root = zk_root
@group = group
@broker_topic_path = "#{@zk_root}/brokers/topics"
@consumer_offset_path = "#{@zk_root}/consumers/#{@group}/offsets/#{@topic}"
end
def run
broker_id = get_broker_id
parts = get_parts(broker_id)
count = 0
parts.each do |part|
offset_in_zk = get_zk_offset(part)
offset_in_broker = get_broker_offset(part)
if offset_in_zk != offset_in_broker
set_zk_offset(part, offset_in_broker)
puts "Synced #{part} offset"
count = count + 1
end
end
return count
end
def set_zk_offset(part, new_offset)
@zk.set({ :path=> "#{@consumer_offset_path}/#{part}", :data => "0-#{new_offset}"})
end
def get_zk_offset(part)
_,offset= safe_zk_get("#{@consumer_offset_path}/#{part}")[:data].split "-"
return offset.to_i
end
def get_broker_offset(part)
#line: avos-fetch-tasks part 6 min_offset 0 max_offset 72492782
lines = stats_broker("offsets")
pat = Regexp.new("#{@topic} part #{part.partition} min_offset [0-9]+ max_offset ([0-9]+)\r\n")
lines.each do |line|
return $1.to_i if line =~ pat
end
end
def safe_zk_get(path, count=0)
begin
return @zk.get({ :path => path })
rescue Exception => e
if count >= 3
raise e
else
safe_zk_close()
@zk = Zookeeper.new(@zk_servers)
return safe_zk_get(path, count.succ)
end
end
end
def get_parts(broker_id)
n_parts = safe_zk_get("#{@broker_topic_path}/#{@topic}/#{broker_id}-m")[:data].to_i
(0..n_parts-1).collect do | n |
Partition.new(broker_id, n)
end
end
def get_broker_id
lines = stats_broker
lines.each do |line|
if line =~ /broker_id (\d+)\r\n/
return $1.to_i
end
end
end
def stats_broker(item="")
host,port = @metaq_server.split ":"
socket = TCPSocket.open(host,port.to_i)
socket.write "stats #{item}\r\n"
socket.flush
lines = []
line = socket.readline
while line != "END\r\n"
line = socket.readline
lines << line
end
socket.close
return lines
end
end
if __FILE__ == $0
begin
brokers = ["localhost:8123"]
brokers.each do | broker|
puts "Begin to sync broker '#{broker}' offsets"
syncer = MetaQOffsetsSyncer.new("zk:2181",broker,zk_root="/avos-fetch-meta",topic="avos-connector-tasks",group="avos-importor")
puts "Synced count:#{syncer.run}"
end
rescue => e
puts "#{e.backtrace.join('\n')} #{e.message}"
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment