Create a gist now

Instantly share code, notes, and snippets.

RabbitMQ back off and retry
#!/usr/bin/env python
import rabbitpy
import time
import random
# This script uses the DLX and TTL concepts in RabbitMQ to create
# a back off and retry logic for queue consumer.
# This script nack's every message it gets without requeue and
# then pushes the message to one of the waiting queues. Which waiting
# queue is found after prying on the message header and finding out
# which retry this is.
# In this example, the back offs happen for 1 second, then 3 seconds,
# then 10 seconds, and then every 10 seconds. Updating the global
# expirations array will automatically be implemented by script by creating
# new queues and new bindings
# Recommended reading:
expirations = [1, 3, 10] # in seconds. used as backoffs
def publish_to_waiting_exchange_and_declare_queue(ch, message):
routing_key = message.routing_key
# find the right expiration time for next iteration. Also the routing key for next iteration.
if and "headers" in and["headers"] and "x-death" in["headers"]:
headers =["headers"]
tries = len(headers["x-death"])
print tries
# Compute the next back off
if tries <= len(expirations) - 1:
next_expiration = expirations[tries]
next_expiration = expirations[len(expirations) - 1]
# Compute the original routing key
first_routing_key = headers["x-death"][len(headers["x-death"]) - 1]["routing-keys"][0]
first_routing_key_split = first_routing_key.split("_")
routing_key = "_".join(first_routing_key_split[1:])
next_expiration = expirations[0]
# use the back off time found earlier and do modifications
print "backing off"
backoff = str(next_expiration * 1000)
backoff_hash = {'expiration': backoff}
print backoff
# create a new waiting queue dead lettering to primary exchange
print "waiting queue declared"
queue = rabbitpy.Queue(ch, "primary_queue_waiting_queue_"+backoff, durable = True, dead_letter_exchange = "primary_exchange")
# bind the queue to waiting exchange
print "waiting queue bound to waiting exchange"
new_routing_key = backoff + "_" + routing_key
print new_routing_key
queue.bind("waiting_exchange", new_routing_key)
# publish the message on the waiting exchange. updating the expiration flag so enable waiting
print "message published"
properties =
new_message = rabbitpy.Message(ch, message.body, properties)
new_message.publish("waiting_exchange", new_routing_key)
# This is needed when we use the same routing key because in that case all
# the queues start getting the message, since same routing key.
#print "queue unbound"
#queue.unbind("waiting_exchange", new_routing_key)
with rabbitpy.Connection('amqp://guest:guest@localhost:5672/%2fbirchbox-event-bus') as conn:
# Use the channel as a context manager
with as channel:
waiting_exchange = rabbitpy.Exchange(channel, 'waiting_exchange', durable = True, exchange_type = "topic")
primary_exchange = rabbitpy.Exchange(channel, 'primary_exchange', durable = True, exchange_type = "topic")
# Create the queue
queue = rabbitpy.Queue(channel, 'primary_queue', durable = True)
# Bind the queue
queue.bind("primary_exchange", "routing_key")
for expiration in expirations:
queue.bind("primary_exchange", str(expiration*1000) + "_routing_key")
# Consume the messages
for message in queue.consume():
print message
publish_to_waiting_exchange_and_declare_queue(channel, message)
except KeyboardInterrupt:
print 'Exited consumer'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment