Created
September 29, 2016 10:20
-
-
Save ivoanjo/3418c0c4251e30c934612d3530a76854 to your computer and use it in GitHub Desktop.
Experiment with using several queues to do ttl-based exponential backoff for messages that fail during processing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'bunny' | |
require 'pry' | |
require 'json' | |
# Relevant links: | |
# * http://www.rabbitmq.com/dlx.html | |
# * https://www.rabbitmq.com/ttl.html | |
# "While consumers never see expired messages, only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered). When setting a per-queue TTL this is not a problem, since expired messages are always at the head of the queue. When setting per-message TTL however, expired messages can queue up behind non-expired ones until the latter are consumed or expired. Hence resources used by such expired messages will not be freed, and they will be counted in queue statistics (e.g. the number of messages in the queue)." | |
# * http://bitsuppliers.com/dead-lettering-with-rabbitmq/ | |
def new_connection(url: 'amqp://localhost:5672') | |
Bunny.new(url).start | |
end | |
def failing_times | |
# 25 times over approximately 21 days, inspired by sidekiq, see | |
# https://github.com/mperham/sidekiq/wiki/Error-Handling#automatic-job-retry | |
#(0..24).map { |n| (n**4) + 15 } | |
[5, 10, 15, 20] | |
end | |
connection = new_connection | |
channel = connection.channel | |
exchange_name = 'foo-exchange' | |
exchange = channel.topic(exchange_name) | |
queue_name = 'generic' | |
queue = channel.queue(queue_name) | |
queue.bind(exchange_name, routing_key: queue_name) | |
failing_exchange_name = "#{exchange_name}-failing" | |
failing_exchange = channel.topic(failing_exchange_name) | |
dead_queue = channel.queue("#{queue_name}-dead") | |
dead_queue.bind(failing_exchange_name, routing_key: "#{queue_name}-dead") | |
failing_times.each do |failing_time| | |
failing_queue_name = "#{queue_name}-failing-#{failing_time}" | |
failing_queue = | |
channel.queue( | |
failing_queue_name, | |
arguments: { | |
'x-dead-letter-exchange' => exchange_name, | |
'x-dead-letter-routing-key' => queue_name, | |
}, | |
) | |
failing_queue.bind( | |
failing_exchange_name, | |
routing_key: failing_queue_name, | |
) | |
puts "Declared queue #{failing_queue_name}" | |
end | |
exchange.publish(JSON.dump(message: "hello world"), routing_key: queue_name) | |
queue.subscribe(manual_ack: true, block: true) do |delivery_info, metadata, payload| | |
message = JSON.parse(payload, symbolize_names: true) | |
puts "Message >> #{message} at #{Time.now}" | |
message = {tries: 0}.merge(message) | |
if message[:tries] < failing_times.length | |
failing_time = failing_times[message[:tries]] | |
message[:tries] += 1 | |
puts "Simulating message fail, putting it in queue #{queue_name}-failing-#{failing_time}" | |
failing_exchange.publish( | |
JSON.dump(message), | |
expiration: failing_time*1000, | |
routing_key: "#{queue_name}-failing-#{failing_time}", | |
) | |
channel.acknowledge(delivery_info.delivery_tag) | |
else | |
puts "Reached tries limit, dropping message" | |
failing_exchange.publish( | |
JSON.dump(message), | |
routing_key: "#{queue_name}-dead", | |
) | |
channel.acknowledge(delivery_info.delivery_tag) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment