Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Consume Reactor Queues from Ruby
require "bunny"
require "json"
require "securerandom"
QUEUE = "appId:queue"
VHOST = 'shared'
def consume
consumer_conn = Bunny.new(
hostname: "us-east-1-a-queue.ably.io",
username: 'app123.key',
password: 'password',
vhost: VHOST
)
consumer_conn.start
# Specifying durable is important as this needs to match the config of the queue which is durable when manually set up
# But this does not work with this permission set as it tries to define a queu
# consumer_queue = consumer_conn.create_channel.queue(QUEUE, durable: true)
# consumer_queue.subscribe do |delivery_info, properties, content|
# instead we use a basic consume client which we will need to tell customers to use
channel = consumer_conn.create_channel
puts "Consuming channel #{QUEUE}..."
# arguments queue name, customer consumer tag if wanted, no_ack (false requires manual ACK)
raise "Queue #{QUEUE} does not exist" unless consumer_conn.queue_exists?(QUEUE)
channel.basic_consume(QUEUE, nil, false) do |delivery_info, properties, payload|
puts " [x] consumed message: #{payload}, delivery tag: #{delivery_info.delivery_tag.to_i}"
if rand(10) < 5
channel.basic_ack(delivery_info.delivery_tag, false)
else
puts " [ ] Nacking #{payload}. Will be re-enqueued"
channel.basic_nack(delivery_info.delivery_tag, false, true)
end
end
while true
sleep 1
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.