Skip to content

Instantly share code, notes, and snippets.

@manoj2411
Forked from kcore/sidekiq_manager.rb
Created November 16, 2017 01:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save manoj2411/df9a0479cac9c500bac65fbd07d8805a to your computer and use it in GitHub Desktop.
Save manoj2411/df9a0479cac9c500bac65fbd07d8805a to your computer and use it in GitHub Desktop.
a load balancer for sidekiq jobs!
module Amura
class SidekiqManager
# a datastructure to maintain queue meta data. the structure of which is {host_name: {queue_name:{min:1,max:1,conc:10,latency:1,queue_size:10,kill_idle:-1, tags:['default'], total_checks:1,current_check:0}}}
# host_name - name of the machine where its running. useful in a distributed environment where app is running on mulitple instances
# queue_name - name of the queue (which you can mention in the sidekiq worker as sidekiq_options :queue => :mailer )
# min: minimum number of processes required to process this queue on this machine.
# max: maximum number of processes permitted to process this queue on this machine. a upper limit to avoid memory overflow and unlimited process spawning.
# conc: concurreny (number of worker threads) for each of the processes. this is -C option given while booting up sidekiq.
# latency: this is the default safe latency which is permissable for this queue. anything beyond this will trigger new processes if other conditions are met.
# queue_size: this is the default maximum safe queue_size which is permissable for this queue. anything beyond 30% of this will trigger new processes if other conditions are met.
# tags: tagging the processes. this is the -g option while booting up sidekiq.
# total_checks: total number of checks to be done before taking a decision to spawn/kill more/extra processes. this helps is identifying temporary small spikes which are false positives and get drained/executed quickly
# current_check: a check counter stored in redis to help take the above decision.
@@healthy_set = {
"node-1" => {
default: {min: 2, max: 4, conc: 15, latency: 30, queue_size:1000, kill_idle: -1, tags:["default"], total_checks:2, current_check:0}
},
"node-2" => {
default: {
min: 1, max: 2, conc: 10, latency: 10, queue_size:1000, kill_idle: 1000, tags:["default"], total_checks:2, current_check:0
},
mailer: {
min: 1, max:3, conc: 20, latency: 10, queue_size:200, kill_idle: 1000, tags:["mixpanel"], total_checks:2, current_check:0}
}
}
@@redis = Redis.new()
end
#quiets all the queues & spawns new set of processes to manage the queues. useful for calling in capistrano scripts after deployment.
def self.restart
host_name = Socket.gethostname
@@healthy_set[host_name].each do |queue_name, queue_meta_data|
queue = Sidekiq::Queue.new(queue_name.to_s)
Sidekiq::ProcessSet.new.select{|x| x["hostname"] == host_name && x["queues"].include?(queue_name.to_s)}.each(&:quiet!)
(1..queue_meta_data[:min]).to_a.each do |tag|
pid = Process.spawn("cd #{Rails.root.to_s} && ( RVM_BIN_PATH=~/.rvm/bin rvm default do bundle exec sidekiq -c #{queue_meta_data[:conc]} -d -i 0 -P #{Rails.root.to_s}/tmp/pids/sidekiq_#{queue_name.to_s}#{tag}.pid -e #{Rails.env} -L #{Rails.root.to_s}/log/sidekiq.log -q #{queue_name.to_s} -g #{queue_name.to_s}#{tag})")
# Detach the spawned process
Process.detach pid
end
end
end
# this is the main method. It will be continoulsy called at certain frequency. can be put up in the crontab to be executed.
def self.run()
host_name = Socket.gethostname
kill_quiet_ones
@@healthy_set[host_name].each do |queue_name,queue_meta_data|
check = @@redis.hget("queue_management", queue_name).to_i
queue = Sidekiq::Queue.new(queue_name.to_s)
current_processes = Sidekiq::ProcessSet.new
processes = current_processes.select{|x| x["hostname"] == host_name && x["queues"].include?(queue_name.to_s)}
if(processes.count < queue_meta_data[:min]) # count is less than minimum required. kernl killed someone. pass it on to FBI to investigate :'\
Rails.logger.info("Queue Alert: Adding more on #{host_name}. #{queue_name.to_s} should have #{queue_meta_data[:min]} but has #{processes.count}. Starting new process.")
tags = processes.collect{|x| x["tag"].gsub(queue_name.to_s,"").to_i}
new_set = (1..queue_meta_data[:min]).to_a - tags
new_set.each do |tag|
pid = Process.spawn("cd #{Rails.root.to_s} && ( RVM_BIN_PATH=~/.rvm/bin rvm default do bundle exec sidekiq -c #{queue_meta_data[:conc]} -d -i 0 -P #{Rails.root.to_s}/tmp/pids/sidekiq_#{queue_name.to_s}#{tag}.pid -e #{Rails.env} -L #{Rails.root.to_s}/log/sidekiq.log -q #{queue_name.to_s} -g #{queue_name.to_s}#{tag})")
# Detach the spawned process
Process.detach pid
end
else
if(queue.size > queue_meta_data[:queue_size]*1.3)
if(check < queue_meta_data[:total_checks])
queue_meta_data[:current_check] += 1
@@redis.hincrby("queue_management", queue_name, 1)
elsif(check == queue_meta_data[:total_checks])
if(processes.count == queue_meta_data[:max])
Rails.logger.info("Queue Alert: Adding more on #{host_name}. #{queue_name.to_s} has max processes(#{processes.count}). Current Stats - Latency : #{queue.latency}, Size : #{queue.size}")
elsif(processes.count < queue_meta_data[:max])
tag = processes.collect{|x| x["tag"].gsub(queue_name.to_s,"").to_i}.sort.last + 1
pid = Process.spawn("cd #{Rails.root.to_s} && ( RVM_BIN_PATH=~/.rvm/bin rvm default do bundle exec sidekiq -c #{queue_meta_data[:conc]} -d -i 0 -P #{Rails.root.to_s}/tmp/pids/sidekiq_#{queue_name.to_s}#{tag}.pid -e #{Rails.env} -L #{Rails.root.to_s}/log/sidekiq.log -q #{queue_name.to_s} -g #{queue_name.to_s}#{tag})")
# Detach the spawned process
Process.detach pid
queue_meta_data[:current_check] = 0
@@redis.hset("queue_management", queue_name, 0)
end
end
elsif(queue.size <= queue_meta_data[:queue_size])
# queue is healthy now. lets kill the extra flab :D
tags = processes.collect{|x| x["tag"].gsub(queue_name.to_s,"").to_i}
set_to_kill = tags - (1..queue_meta_data[:min]).to_a
Rails.logger.info("Queue Alert: Quieting few from #{host_name}:#{queue_name.to_s}. Killing off #{set_to_kill.count} processes. Current Stats - Existing Processes: #{processes.count}, Latency : #{queue.latency}, Size : #{queue.size}").deliver
processes.select{|x| set_to_kill.include?(x["tag"].gsub(queue_name.to_s,"").to_i)}.each do |process|
process.quiet!
end
kill_quiet_ones
elsif(check < queue_meta_data[:total_checks] && queue.size <= queue_meta_data[:queue_size])
queue_meta_data[:current_check] += 1
@@redis.hincrby("queue_management", queue_name, 1)
end
end
end
end
# kills all the quiet processes
def self.kill_quiet_ones
host_name = Socket.gethostname
data = []
Sidekiq::ProcessSet.new.select{|x| x["quiet"] == "true" && x["busy"] == 0 && x["hostname"] == host_name}.each{|x| x.stop!}
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment