Skip to content

Instantly share code, notes, and snippets.

@ivoanjo
Created September 29, 2016 10:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ivoanjo/3418c0c4251e30c934612d3530a76854 to your computer and use it in GitHub Desktop.
Save ivoanjo/3418c0c4251e30c934612d3530a76854 to your computer and use it in GitHub Desktop.
Experiment with using several queues to do ttl-based exponential backoff for messages that fail during processing
require 'bunny'
require 'pry'
require 'json'
# Relevant links:
# * http://www.rabbitmq.com/dlx.html
# * https://www.rabbitmq.com/ttl.html
# "While consumers never see expired messages, only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered). When setting a per-queue TTL this is not a problem, since expired messages are always at the head of the queue. When setting per-message TTL however, expired messages can queue up behind non-expired ones until the latter are consumed or expired. Hence resources used by such expired messages will not be freed, and they will be counted in queue statistics (e.g. the number of messages in the queue)."
# * http://bitsuppliers.com/dead-lettering-with-rabbitmq/
def new_connection(url: 'amqp://localhost:5672')
Bunny.new(url).start
end
def failing_times
# 25 times over approximately 21 days, inspired by sidekiq, see
# https://github.com/mperham/sidekiq/wiki/Error-Handling#automatic-job-retry
#(0..24).map { |n| (n**4) + 15 }
[5, 10, 15, 20]
end
connection = new_connection
channel = connection.channel
exchange_name = 'foo-exchange'
exchange = channel.topic(exchange_name)
queue_name = 'generic'
queue = channel.queue(queue_name)
queue.bind(exchange_name, routing_key: queue_name)
failing_exchange_name = "#{exchange_name}-failing"
failing_exchange = channel.topic(failing_exchange_name)
dead_queue = channel.queue("#{queue_name}-dead")
dead_queue.bind(failing_exchange_name, routing_key: "#{queue_name}-dead")
failing_times.each do |failing_time|
failing_queue_name = "#{queue_name}-failing-#{failing_time}"
failing_queue =
channel.queue(
failing_queue_name,
arguments: {
'x-dead-letter-exchange' => exchange_name,
'x-dead-letter-routing-key' => queue_name,
},
)
failing_queue.bind(
failing_exchange_name,
routing_key: failing_queue_name,
)
puts "Declared queue #{failing_queue_name}"
end
exchange.publish(JSON.dump(message: "hello world"), routing_key: queue_name)
queue.subscribe(manual_ack: true, block: true) do |delivery_info, metadata, payload|
message = JSON.parse(payload, symbolize_names: true)
puts "Message >> #{message} at #{Time.now}"
message = {tries: 0}.merge(message)
if message[:tries] < failing_times.length
failing_time = failing_times[message[:tries]]
message[:tries] += 1
puts "Simulating message fail, putting it in queue #{queue_name}-failing-#{failing_time}"
failing_exchange.publish(
JSON.dump(message),
expiration: failing_time*1000,
routing_key: "#{queue_name}-failing-#{failing_time}",
)
channel.acknowledge(delivery_info.delivery_tag)
else
puts "Reached tries limit, dropping message"
failing_exchange.publish(
JSON.dump(message),
routing_key: "#{queue_name}-dead",
)
channel.acknowledge(delivery_info.delivery_tag)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment