Skip to content

Instantly share code, notes, and snippets.

@raztud
Created April 23, 2019 16:56
Show Gist options
  • Save raztud/74a626f66f1c6f8e7e36bc9bb2fae291 to your computer and use it in GitHub Desktop.
Save raztud/74a626f66f1c6f8e7e36bc9bb2fae291 to your computer and use it in GitHub Desktop.
Rabbitmq retry mechanism with Dead Letter
######### CREATE EXCHANGES/QUEUES
import pika
import sys
WORK_QUEUE = "WorkQueue"
WORK_EXCHANGE = "WorkExchange" # dead letter exchange
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange=WORK_EXCHANGE, exchange_type='direct', durable=True)
result = channel.queue_declare(WORK_QUEUE, durable=True, exclusive=False)
channel.queue_bind(exchange=WORK_EXCHANGE, queue=WORK_QUEUE, routing_key='')
# retry queue
RETRY_EXCHANGE = "RetryExchange";
RETRY_QUEUE = "RetryQueue";
RETRY_DELAY = 10000; # in ms; 10s
queueArgs = {
"x-dead-letter-exchange": WORK_EXCHANGE,
"x-message-ttl": RETRY_DELAY
}
channel.exchange_declare(exchange=RETRY_EXCHANGE, exchange_type='direct', durable=True)
result = channel.queue_declare(RETRY_QUEUE, durable=True, exclusive=False, arguments=queueArgs)
channel.queue_bind(exchange=RETRY_EXCHANGE, queue=RETRY_QUEUE, routing_key='')
#============= PRODUCER
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
WORK_EXCHANGE = "WorkExchange" # dead letter exchange
channel.exchange_declare(exchange=WORK_EXCHANGE, exchange_type='direct', durable=True)
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange=WORK_EXCHANGE, routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()
#============= CONSUMER
#============= if the message contains the word FAIL it will be requeued
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
WORK_EXCHANGE = "WorkExchange" # dead letter exchange
RETRY_EXCHANGE = "RetryExchange";
WORK_QUEUE = "WorkQueue"
# channel.exchange_declare(exchange=EXCHANGE, exchange_type='direct')
result = channel.queue_declare(WORK_QUEUE, durable=True, exclusive=False)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
body = body.decode('utf-8')
print(" [x] Got %r" % body)
if 'FAIL' in body:
print("Mark as NACK: {}".format(body))
channel.basic_publish(exchange=RETRY_EXCHANGE, routing_key='', body=body)
# ch.basic_nack(method.delivery_tag, multiple=False, requeue=False)
# else:
# print("Mark as ACK: {}".format(body))
# ch.basic_ack(method.delivery_tag, multiple=False)
channel.basic_consume(queue=WORK_QUEUE, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment