Skip to content

Instantly share code, notes, and snippets.

@zaeemarshad
Created May 10, 2022 23:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zaeemarshad/0d6992a8915bd48395ad1217a221fa50 to your computer and use it in GitHub Desktop.
Save zaeemarshad/0d6992a8915bd48395ad1217a221fa50 to your computer and use it in GitHub Desktop.
RMQ consumer
#!/usr/bin/env/ruby
# frozen_string_literals: true
#
#
require 'bunny'
require 'pry'
require 'logger'
host = 'localhost'
vhost = '/'
queue = 'test1'
exchange = 'amq.topic'
user = 'user'
password = 'password'
sleep_time = 0.0
nack_ratio = 0.0
prefetch = 7000
logger = Logger.new($stdout)
conn = Bunny.new(:host => host, :vhost => vhost, :user => user, :password => password)
conn.start
ch = conn.create_channel
ch.prefetch(prefetch)
q = ch.queue(queue, :exclusive => false, :durable => true, :arguments => { "x-queue-mode": 'lazy'})
x = ch.topic(exchange, :durable => true)
ack_count = 0
nack_count = 0
q.bind(x)
puts "nack: ack:"
begin
q.subscribe(:manual_ack => true, :block => true) do |delivery_info, payload|
if rand > nack_ratio
ch.acknowledge(delivery_info.delivery_tag, false)
ack_count += 1
else
ch.basic_nack(delivery_info.delivery_tag, false, true)
sleep sleep_time
nack_count+=1
end
print (nack_count.to_s + " " + ack_count.to_s + "\r")
$stdout.flush
end
rescue SystemExit, Interrupt
logger.info "shutting down"
ch.maybe_kill_consumer_work_pool!
conn.close
conn.clean_up_on_shutdown
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment