Skip to content

Instantly share code, notes, and snippets.

@riyad
Last active August 29, 2015 14:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save riyad/7439eb545dabf287bcc9 to your computer and use it in GitHub Desktop.
Save riyad/7439eb545dabf287bcc9 to your computer and use it in GitHub Desktop.
Provoke RabbitMQ Queue Crash With Deadlettering (Issue #216)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function
import logging
import puka
logger = logging.getLogger("provoke_rabbitmq_queue_crash_with_deadlettering")
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())
AMQP_URL = "amqp://user:pass@localhost/%2f"
QUEUE = "test.issue_216_crashing_queue"
EXCHANGE = 'amq.topic'
ROUTING_KEY = 'test.issue_216'
DLX = 'amq.topic'
TRIES = 5
def get_death_count(message):
return sum(death.get('count', 1)
for death in message['headers'].get('x-death', []))
def filter_headers(headers):
blacklisted_headers = ('x-puka-delivery-tag',)
return {
k: v for k, v in headers.iteritems()
if k not in blacklisted_headers
}
def setup_queue_with_dlx(client, queue, exchange=EXCHANGE, routing_key=ROUTING_KEY, dlx=DLX):
client.wait(client.queue_declare(queue, durable=True))
client.wait(client.queue_bind(queue, exchange, routing_key))
client.wait(client.queue_bind(queue, dlx, queue))
def send_to_timeout(client, message):
timeout_queue = "toq-gen{}-{}".format(get_death_count(message), QUEUE)
new_message_ttl = 0
queue_ttl_extra = 1000
logger.info("declaring timeout queue %s", timeout_queue)
client.wait(client.queue_declare(
timeout_queue,
arguments={
'x-expires': new_message_ttl + queue_ttl_extra,
'x-message-ttl': new_message_ttl,
'x-dead-letter-exchange': DLX,
'x-dead-letter-routing-key': QUEUE,
},
))
logger.info("publishing to %s: %s", timeout_queue, message)
client.wait(client.basic_publish(
exchange='',
routing_key=timeout_queue,
body=message['body'],
headers=filter_headers(message['headers']),
))
def main():
logger.info("connecting ...")
client = puka.Client(AMQP_URL)
client.wait(client.connect())
logger.info("setting up ...")
setup_queue_with_dlx(client, QUEUE, routing_key=ROUTING_KEY)
logger.info("publishing ...")
client.wait(client.basic_publish(EXCHANGE, ROUTING_KEY, body="foo"))
for i in xrange(TRIES):
message = client.wait(client.basic_get(QUEUE, no_ack=True))
logger.info("fetched message: %s", message)
if 'body' in message:
send_to_timeout(client, message)
client.wait(client.close())
if __name__ == "__main__":
main()
@riyad
Copy link
Author

riyad commented Jul 7, 2015

Update the AMQP_URL to point to your RabbitMQ instance.
Install dependencies with:

pip install puka

@riyad
Copy link
Author

riyad commented Jul 7, 2015

FYI: you can follow the discussion in rabbitmq/rabbitmq-server#216

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment