Skip to content

Instantly share code, notes, and snippets.

@lloydzhou
Created November 8, 2018 09:58
Show Gist options
  • Save lloydzhou/ffc693caaa73d52593fec94b36ea673e to your computer and use it in GitHub Desktop.
Save lloydzhou/ffc693caaa73d52593fec94b36ea673e to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# http://www.rabbitmq.com/tutorials/tutorial-two-python.html
from connect import connection, pika
channel = connection.channel()
channel.exchange_declare(exchange='dlx')
result = channel.queue_declare(queue='dl')
queue_name = result.method.queue
channel.queue_bind(
exchange='dlx',
routing_key='task_queue', # x-dead-letter-routing-key
queue=queue_name
)
print(' [*] Waiting for dead-letters. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % (properties,))
print(" [reason] : %s : %r" % (properties.headers['x-death'][0]['reason'], body))
ch.basic_ack(delivery_tag=method.delivery_tag)
print(" [requeue] : %s : %r" % (properties.priority, body))
ch.basic_publish(
exchange='',
routing_key='task_queue',
body=body,
properties=pika.BasicProperties(
expiration=str(1000),
priority=properties.priority + 1,
headers=properties.headers,
),
)
channel.basic_consume(callback, queue='dl')
channel.start_consuming()
#!/usr/bin/env python
# http://www.rabbitmq.com/tutorials/tutorial-two-python.html
import sys
from connect import connection, pika
channel = connection.channel()
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
expiration=str(1000),
priority=int(sys.argv[1] or 0),
),
)
print(" [x] Sent %r" % (message,))
connection.close()
#!/usr/bin/env python
# http://www.rabbitmq.com/tutorials/tutorial-two-python.html
import time
import random
from connect import connection
channel = connection.channel()
channel.queue_declare(
queue='task_queue',
arguments={
'x-message-ttl': 1000,
"x-dead-letter-exchange": "dlx",
# "x-dead-letter-routing-key" : "dl", # if not specified, queue's routing-key is used
}
)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] Received %r %r" % (properties.priority, body,))
if 1 or random.random() < 0.5:
time.sleep(5)
ch.basic_ack(delivery_tag=method.delivery_tag)
print(" [x] Done")
else:
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
print(" [x] Rejected")
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='task_queue')
channel.start_consuming()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment