Skip to content

Instantly share code, notes, and snippets.

@kpheasey
Created February 11, 2016 19:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kpheasey/9c9255c4ce20beeabde0 to your computer and use it in GitHub Desktop.
Save kpheasey/9c9255c4ce20beeabde0 to your computer and use it in GitHub Desktop.
Sidekiq Pro Reliability & Unique Jobs - merge stale private queues and clear unique locks
namespace :sidekiq do
task merge_stale_queues: :environment do
ec2_private_ips ||= AWS::EC2.new(region: 'us-west-2').instances.map { |instance| instance.private_ip_address }
merged = false
puts "EC2 Private IPs: #{ec2_private_ips}"
Sidekiq.redis do |conn|
private_queues = conn.keys('queue:*_*_*')
puts "Private Queues: #{private_queues}"
private_queues.each do |private_queue|
puts "Checking queue: #{private_queue}"
name = private_queue.split('_')
public_queue = name[0]
ec2_private_ip = name[1].gsub('ip-', '').gsub('-', '.')
next if ec2_private_ips.include? ec2_private_ip
merged = true
puts "No EC2 instance with private_ip: #{ec2_private_ip} found"
puts "Merging #{private_queue} into #{public_queue}"
conn.llen(private_queue).times do
conn.rpoplpush(private_queue, public_queue)
end
end
# clear unique jobs if there was need to merge old queue.
# otherwise locks may never expire.
if merged
Sidekiq.redis do |conn|
puts 'Removing uniquejobs:* keys'
conn.keys('uniquejobs:*').each do |key|
conn.del(key)
end
puts 'Removing uniquejobs hash'
conn.namespace = nil
conn.del('uniquejobs')
end
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment