Skip to content

Instantly share code, notes, and snippets.

@maheshgattani
Last active May 3, 2023 13:40
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save maheshgattani/664f78be9e99f6f93e74 to your computer and use it in GitHub Desktop.
Save maheshgattani/664f78be9e99f6f93e74 to your computer and use it in GitHub Desktop.
RabbitMQ back off and retry
#!/usr/bin/env python
import rabbitpy
import time
import random
#
# This script uses the DLX and TTL concepts in RabbitMQ to create
# a back off and retry logic for queue consumer.
#
# This script nack's every message it gets without requeue and
# then pushes the message to one of the waiting queues. Which waiting
# queue is found after prying on the message header and finding out
# which retry this is.
#
# In this example, the back offs happen for 1 second, then 3 seconds,
# then 10 seconds, and then every 10 seconds. Updating the global
# expirations array will automatically be implemented by script by creating
# new queues and new bindings
#
# Recommended reading:
# https://www.rabbitmq.com/dlx.html
# https://www.rabbitmq.com/ttl.html
# http://globaldev.co.uk/2014/07/back-off-and-retry-with-rabbitmq/
#
expirations = [1, 3, 10] # in seconds. used as backoffs
def publish_to_waiting_exchange_and_declare_queue(ch, message):
routing_key = message.routing_key
# find the right expiration time for next iteration. Also the routing key for next iteration.
if message.properties and "headers" in message.properties and message.properties["headers"] and "x-death" in message.properties["headers"]:
headers = message.properties["headers"]
tries = len(headers["x-death"])
print tries
# Compute the next back off
if tries <= len(expirations) - 1:
next_expiration = expirations[tries]
else:
next_expiration = expirations[len(expirations) - 1]
# Compute the original routing key
first_routing_key = headers["x-death"][len(headers["x-death"]) - 1]["routing-keys"][0]
first_routing_key_split = first_routing_key.split("_")
routing_key = "_".join(first_routing_key_split[1:])
else:
next_expiration = expirations[0]
# use the back off time found earlier and do modifications
print "backing off"
backoff = str(next_expiration * 1000)
backoff_hash = {'expiration': backoff}
print backoff
# create a new waiting queue dead lettering to primary exchange
print "waiting queue declared"
queue = rabbitpy.Queue(ch, "primary_queue_waiting_queue_"+backoff, durable = True, dead_letter_exchange = "primary_exchange")
queue.declare()
# bind the queue to waiting exchange
print "waiting queue bound to waiting exchange"
new_routing_key = backoff + "_" + routing_key
print new_routing_key
queue.bind("waiting_exchange", new_routing_key)
# publish the message on the waiting exchange. updating the expiration flag so enable waiting
print "message published"
properties = message.properties
properties.update(backoff_hash)
new_message = rabbitpy.Message(ch, message.body, properties)
new_message.publish("waiting_exchange", new_routing_key)
# This is needed when we use the same routing key because in that case all
# the queues start getting the message, since same routing key.
#print "queue unbound"
#queue.unbind("waiting_exchange", new_routing_key)
with rabbitpy.Connection('amqp://guest:guest@localhost:5672/%2fbirchbox-event-bus') as conn:
# Use the channel as a context manager
with conn.channel() as channel:
waiting_exchange = rabbitpy.Exchange(channel, 'waiting_exchange', durable = True, exchange_type = "topic")
waiting_exchange.declare()
primary_exchange = rabbitpy.Exchange(channel, 'primary_exchange', durable = True, exchange_type = "topic")
primary_exchange.declare()
# Create the queue
queue = rabbitpy.Queue(channel, 'primary_queue', durable = True)
queue.declare()
# Bind the queue
queue.bind("primary_exchange", "routing_key")
for expiration in expirations:
queue.bind("primary_exchange", str(expiration*1000) + "_routing_key")
try:
# Consume the messages
for message in queue.consume():
print message
publish_to_waiting_exchange_and_declare_queue(channel, message)
message.ack()
except KeyboardInterrupt:
print 'Exited consumer'
@usmankhan0319
Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment