Skip to content

Instantly share code, notes, and snippets.

@justinhennessy
Forked from stuart/queue_fix.rb
Last active December 19, 2015 22:28
Show Gist options
  • Save justinhennessy/6027120 to your computer and use it in GitHub Desktop.
Save justinhennessy/6027120 to your computer and use it in GitHub Desktop.
De dup RabbitMQ queue
!#/usr/env ruby
require 'json'
require 'uri'
# Removes repeated USER records from one of the AMQP queues.
#
# USAGE: ruby queue_fix.rb "queue_name" "email_address"
#
# This will dump the whole queue to a temp file. (In case anything goes wrong)
# it will strip out the user messages containing the email provided then
# write the cleaned file to a temp file.
# The cleaned file is then gone through and requeued
#
# TODO: use HTTP library instead of calls to curl.
class QueueCleaner
attr_accessor :queue, :email
def initialize queue, email
@queue = queue
@email = email
end
def get_queue_length
q = JSON.parse `#{curl "/api/queues/%2f/#{queue}"}`
q["messages_ready"].to_i
end
def dump
options = {count: get_queue_length, requeue: false, encoding: "auto"}.to_json
command = curl "/api/queues/%2f/#{queue}/get", "'#{options}'", "POST"
`#{command} > #{dump_file}` # Let the shell do the thing that the shell does.
end
def requeue
File.open(clean_file) do |file|
data = JSON.parse file.read
data.each do |record|
options = record.to_json
command = curl "/api/exchanges/%2f/town_crier/publish", "'#{options}'", "POST"
`#{command}`
end
end
end
def clean_data
cleaned_data = []
File.open(dump_file) do |file|
data = JSON.parse(file.read)
cleaned_data = data.reject do |record|
payload = JSON.parse record["payload"]
payload["user"] && payload["user"]["email"] == email
end
end
File.open(clean_file, "w") do |file|
file.write cleaned_data.to_json
end
end
def clean
dump
clean_data
requeue
end
def dump_file
"/tmp/#{queue}.dump.json"
end
def clean_file
"#{dump_file}.clean"
end
def curl path, data = '{}', method = 'GET', host = host, port = port
uri = URI::HTTP.build(host: host, port: port, path: path)
c = "curl -u #{user}:#{password} -X #{method} '#{uri.to_s}' -d #{data}"
puts c
c
end
def user
ENV["AMQP_USER"]
end
def password
ENV["AMQP_PASSWORD"]
end
def host
ENV["AMQP_HOST"]
end
def port
ENV["AMQP_PORT"] || 15672
end
end
def usage
puts "USAGE: ruby queue_fix.rb 'queue_name', 'email_address'"
puts "Make sure the environment variables for AMQP_USER, AMQP_PASSWORD and AMQP_HOST are set."
exit(0)
end
usage if ARGV.length < 2
cleaner = QueueCleaner.new ARGV[0], ARGV[1]
usage if cleaner.user.nil? || cleaner.host.nil? || cleaner.password.nil?
cleaner.clean
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment