Skip to content

Instantly share code, notes, and snippets.

@adback03
Forked from reedsa/new_task.py
Created April 29, 2021 22:39
Show Gist options
  • Save adback03/7f657469ae176e52735fa0480d4ef343 to your computer and use it in GitHub Desktop.
Save adback03/7f657469ae176e52735fa0480d4ef343 to your computer and use it in GitHub Desktop.
RabbitMQ Retry using Dead Letter Exchange in Python/Pika
#!/usr/bin/env python
# http://www.rabbitmq.com/tutorials/tutorial-two-python.html
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message
)
print " [x] Sent %r" % (message,)
connection.close()
#!/usr/bin/env python [15/19]
# http://www.rabbitmq.com/tutorials/tutorial-two-python.html
import time
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='dlx', type='direct')
channel.queue_declare(queue='dl',
arguments={
'x-message-ttl': 5000,
'x-dead-letter-exchange': 'amq.direct',
'x-dead-letter-routing-key': 'task_queue'
})
channel.queue_bind(exchange='dlx', queue='dl')
print ' [*] Waiting for dead-letters. To exit press CTRL+C'
#!/usr/bin/env python
# http://www.rabbitmq.com/tutorials/tutorial-two-python.html
import pika
import time
import random
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue',
arguments={
'x-message-ttl' : 1000,
'x-dead-letter-exchange' : 'dlx',
'x-dead-letter-routing-key' : 'dl'
}
)
channel.queue_bind(exchange='amq.direct', queue='task_queue')
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
print " [x] Properties %r" % (properties,)
#if random.random() < 0.5:
if False:
ch.basic_ack(delivery_tag = method.delivery_tag)
time.sleep(5)
print " [x] Done"
else:
if properties.headers.get('x-death') == None or properties.headers['x-retry-count'] < 5:
ch.basic_reject(delivery_tag = method.delivery_tag, requeue=False)
print " [x] Rejected"
else:
ch.basic_ack(delivery_tag=method.delivery_tag)
print " [x] Timed out"
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment