Skip to content

Instantly share code, notes, and snippets.

@huy
Created January 16, 2013 11:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save huy/4546608 to your computer and use it in GitHub Desktop.
Save huy/4546608 to your computer and use it in GitHub Desktop.
require './redis_client'
class RedisMonitor
CHECK_MASTER_PERIOD = 5
attr_accessor :verbose
def initialize(instances={})
@instances = instances
end
def start(instance_id)
my_host, my_port = @instances[instance_id].split(':')
out = exec_shell_cmd(server_prog, instance_id, 'status')
if out =~ /stopped/m
$stderr.puts "#{Time.now} Attempt to start '#{instance_id}' using port '#{my_port}'"
out = exec_shell_cmd(server_prog, instance_id, 'start')
keep_single_master
end
begin
my_redis = RedisClient.new(my_host, my_port)
info = my_redis.open.info_replication
my_redis.close
$stderr.puts "#{Time.now} '#{@instances[instance_id]}' is started as #{info[:role]}"
rescue => e
$stderr.puts "#{Time.now} error #{e} when accessing '#{url}'"
end
end
def promote(instance_id)
out = exec_shell_cmd(server_prog, instance_id, 'status')
if out =~ /running/m
reset_master(instance_id)
else
$stderr.puts "#{Time.now} '#{@instances[instance_id]}' is not running, start it before promoting"
end
end
def watch
while true
sleep(1 + rand(CHECK_MASTER_PERIOD)) # wait random interval to minimize race
keep_single_master
end
end
def keep_single_master
live_instances = get_live_instances
master_instances = live_instances.select {|z| z[:role] == 'master'}
if master_instances.size > 1 # there are more than one master
elegible_master = master_instances.max{|a,b| a[:priority] <=> b[:priority]}
$stderr.puts "#{Time.now} there are more than one master: '#{master_instances.map{|z| @instances[z[:id]]}.inspect}', select '#{@instances[elegible_master[:id]]}' and demote all others"
reset_master(elegible_master[:id])
end
if master_instances.size == 0 # there is no master
elegible_master = live_instances.max{|a,b| a[:priority] <=> b[:priority]}
$stderr.puts "#{Time.now} there is no master, promote '#{@instances[elegible_master[:id]]}'"
reset_master(elegible_master[:id])
end
end
private
def get_live_instances
result = []
@instances.each do |id, url|
begin
redis = RedisClient.new(*url.split(':')).open
info = redis.info_replication
result << {:id=>id, :role=>info[:role], :priority=>redis.config_get_slave_priority}
redis.close
rescue => e
$stderr.puts "#{Time.now} error #{e} when accessing '#{url}'"
end
end
return result
end
def reset_master(master_id)
master_host, master_port = @instances[master_id].split(':')
@instances.each do |id, url|
begin
redis = RedisClient.new(*url.split(':')).open
info = redis.info_replication
if id == master_id
if info[:role] == 'master'
$stderr.puts "#{Time.now} '#{@instances[master_id]}' is already master"
else
out = redis.slaveof_no_one
$stderr.puts "#{Time.now} promote '#{@instances[master_id]}' #{out}"
end
else
if info[:master_host] == master_host && info[:master_port] == master_port
$stderr.puts "#{Time.now} '#{@instances[id]}' is already slave of #{@instances[master_id]}"
else
out = redis.slaveof(master_host, master_port)
$stderr.puts "#{Time.now} demote '#{@instances[id]}' #{out}"
end
end
redis.close
rescue => e
$stderr.puts "#{Time.now} error #{e} when accessing '#{url}'"
end
end
end
def exec_shell_cmd(*cmd)
$stderr.puts "--- #{cmd.join(' ')}" if @verbose
out = open("|#{cmd.join(' ')}").read
$stderr.puts "---\n#{out}\n" if @verbose
return out
end
def server_prog
"/aplicaciones/scripts/redisServer.sh"
end
def client_prog
"/usr/local/bin/redis-cli"
end
#!/usr/bin/env ruby
#
require './redis_master_slave'
RedisMonitor.new('inst1'=>'host1:6380',
'inst2'=>'host1:6381').promote('inst1')
require 'socket'
class RedisClient
CRLF = "\r\n".freeze
MINUS = "-".freeze
PLUS = "+".freeze
COLON = ":".freeze
DOLLAR = "$".freeze
ASTERISK = "*".freeze
attr_accessor :verbose
attr_reader :host, :port
def initialize(host = 'localhost', port = 6379)
@host, @port = host, port
end
def cmd(*args)
buff = []
buff << "#{ASTERISK}#{args.size}#{CRLF}"
args.each do |a|
buff << "#{DOLLAR}#{a.bytesize}#{CRLF}"
buff << "#{a}#{CRLF}"
end
$stderr.puts "--- send\n#{buff.join}\n---" if @verbose
@socket.send(buff.join,0)
ret = read_reply
$stderr.puts "--- got\n#{ret.inspect}\n---" if @verbose
return ret
end
def open
@socket = TCPSocket.new(@host, @port)
return self
end
def close
if @socket and (not @socket.closed?)
@socket.close
end
return self
end
def slaveof_no_one
return cmd('slaveof', 'no', 'one')
end
def slaveof(host, port)
return cmd('slaveof', host, port.to_s)
end
def info_replication
out = cmd('info', 'replication')
return parse_replication_info(out)
end
def config_get_slave_priority
out = cmd('config','get','slave-priority')
return out.last.to_i
end
def config_get_repl_ping_slave_period
out = cmd('config','get','repl-ping-slave-period')
return out.last.to_i
end
private
def read_reply
line = @socket.gets
reply_type = line.slice!(0, 1)
case reply_type
when MINUS then read_error_reply(line)
when PLUS then read_status_reply(line)
when COLON then read_integer_reply(line)
when DOLLAR then read_bulk_reply(line)
when ASTERISK then read_multi_bulk_reply(line)
else raise "protocol error got #{reply_type} while expecting one of #{[MINUS,PLUS,COLON,DOLLAR,ASTERISK].join(',')}"
end
rescue => e
$stderr.puts e.backtrace
$stderr.puts "#{Time.now} Error '#{e}' when reading reply from #{@host}:#{@port}"
end
def read_error_reply(line)
line.strip
end
def read_status_reply(line)
line.strip
end
def read_integer_reply(line)
line.to_i
end
def read_bulk_reply(line)
bulklen = line.to_i
return if bulklen == -1
reply = encode(@socket.read(bulklen))
@socket.read(2) # Discard CRLF.
reply
end
def read_multi_bulk_reply(line)
n = line.to_i
return if n == -1
Array.new(n) { read_reply }
end
def parse_replication_info(out)
ret = {}
if out =~ /role:(master|slave)/m
ret[:role] = $~[1]
end
if out =~ /master_host:(\w+)/m
ret[:master_host] = $~[1]
end
if out =~ /master_port:(\w+)/m
ret[:master_port] = $~[1]
end
if out =~ /master_link_status:(\w+)/m
ret[:master_link_status] = $~[1]
end
return ret
end
if defined?(Encoding::default_external)
def encode(string)
string.force_encoding(Encoding::default_external)
end
else
def encode(string)
string
end
end
end
#!/usr/bin/env ruby
#
require './redis_master_slave'
RedisMonitor.new('inst1'=>'host1:6380',
'inst2'=>'host1:6381').start('inst1')
#!/usr/bin/env ruby
#
require './redis_master_slave'
RedisMonitor.new('inst1'=>'host1:6380',
'inst2'=>'host1:6381').watch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment