Skip to content

Instantly share code, notes, and snippets.

@gmr
Last active August 29, 2015 14:02
Show Gist options
  • Save gmr/b4e01a71c793aa5df825 to your computer and use it in GitHub Desktop.
Save gmr/b4e01a71c793aa5df825 to your computer and use it in GitHub Desktop.
Example of using a dead-letter exchange to delay the processing of messages that are rejected for any reason
import datetime
import rabbitpy
import uuid
ROUTING_KEY = 'message.foo'
with rabbitpy.Connection('amqp://localhost:5672/%2f') as conn:
with conn.channel() as channel:
# The primary exchange is where we normally publish messages
primary = rabbitpy.TopicExchange(channel, 'primary')
primary.declare()
# The timeout exchange is where we send bad messages go sit in the corner
timeout = rabbitpy.TopicExchange(channel, 'timeout')
timeout.declare()
# The work queue is where messages will go by default
work = rabbitpy.Queue(channel, 'work',
dead_letter_exchange='timeout')
work.declare()
work.bind(primary, 'message.#')
# The corner queue is where bad messages sit and wait
corner = rabbitpy.Queue(channel, 'corner',
message_ttl=15000,
dead_letter_exchange='primary')
corner.declare()
corner.bind(timeout, '#')
# Publish a few messages to test with
for iteration in range(0, 5):
message = rabbitpy.Message(channel, str(uuid.uuid4()))
message.publish(primary, ROUTING_KEY)
# Now consume the messages and reject them
for message in work:
print("\n\nNew Message @ %s\n" % datetime.datetime.now().isoformat())
message.pprint(True)
message.reject()
$ python dlx-example.py
New Message @ 2014-06-18T10:13:00.985734
Exchange: primary
Routing Key: message.foo
Properties:
{'app_id': '',
'cluster_id': '',
'content_encoding': '',
'content_type': '',
'correlation_id': '',
'delivery_mode': None,
'expiration': '',
'headers': None,
'message_id': '273e7536-a7e5-4325-80c6-de2113609049',
'message_type': '',
'priority': None,
'reply_to': '',
'timestamp': datetime.datetime(2014, 6, 18, 10, 13),
'user_id': ''}
Body:
'8c9929e7-bd2a-4905-b6ed-c5694eee180d'
New Message @ 2014-06-18T10:13:00.986965
Exchange: primary
Routing Key: message.foo
Properties:
{'app_id': '',
'cluster_id': '',
'content_encoding': '',
'content_type': '',
'correlation_id': '',
'delivery_mode': None,
'expiration': '',
'headers': None,
'message_id': 'f424ea1f-9395-450d-8d11-c56d249e33f2',
'message_type': '',
'priority': None,
'reply_to': '',
'timestamp': datetime.datetime(2014, 6, 18, 10, 13),
'user_id': ''}
Body:
'e9fe58d8-b294-43cc-addb-01733da9544f'
New Message @ 2014-06-18T10:13:00.987888
Exchange: primary
Routing Key: message.foo
Properties:
{'app_id': '',
'cluster_id': '',
'content_encoding': '',
'content_type': '',
'correlation_id': '',
'delivery_mode': None,
'expiration': '',
'headers': None,
'message_id': '00c3220a-f444-4734-be78-29e229b13acc',
'message_type': '',
'priority': None,
'reply_to': '',
'timestamp': datetime.datetime(2014, 6, 18, 10, 13),
'user_id': ''}
Body:
'3c78b05b-deef-42da-9fdc-e73d0cdb92a3'
New Message @ 2014-06-18T10:13:00.988873
Exchange: primary
Routing Key: message.foo
Properties:
{'app_id': '',
'cluster_id': '',
'content_encoding': '',
'content_type': '',
'correlation_id': '',
'delivery_mode': None,
'expiration': '',
'headers': None,
'message_id': '68d41564-88aa-4883-9c17-5394cebd2053',
'message_type': '',
'priority': None,
'reply_to': '',
'timestamp': datetime.datetime(2014, 6, 18, 10, 13),
'user_id': ''}
Body:
'f9baf318-8861-49fa-9d11-b6a70fd11714'
New Message @ 2014-06-18T10:13:00.990140
Exchange: primary
Routing Key: message.foo
Properties:
{'app_id': '',
'cluster_id': '',
'content_encoding': '',
'content_type': '',
'correlation_id': '',
'delivery_mode': None,
'expiration': '',
'headers': None,
'message_id': '28ccba1b-7bd4-4bf0-8eae-b27c567bd336',
'message_type': '',
'priority': None,
'reply_to': '',
'timestamp': datetime.datetime(2014, 6, 18, 10, 13),
'user_id': ''}
Body:
'60b7ce7f-6c2e-4ecd-a7cc-e25e1bc572c0'
New Message @ 2014-06-18T10:13:16.067176
Exchange: primary
Routing Key: message.foo
Properties:
{'app_id': '',
'cluster_id': '',
'content_encoding': '',
'content_type': '',
'correlation_id': '',
'delivery_mode': None,
'expiration': '',
'headers': {'x-death': [{'exchange': 'timeout',
'queue': 'corner',
'reason': 'expired',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=16, tm_wday=2, tm_yday=169, tm_isdst=0)},
{'exchange': 'primary',
'queue': 'work',
'reason': 'rejected',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=0, tm_wday=2, tm_yday=169, tm_isdst=0)}]},
'message_id': '273e7536-a7e5-4325-80c6-de2113609049',
'message_type': '',
'priority': None,
'reply_to': '',
'timestamp': datetime.datetime(2014, 6, 18, 10, 13),
'user_id': ''}
Body:
'8c9929e7-bd2a-4905-b6ed-c5694eee180d'
New Message @ 2014-06-18T10:13:16.069496
Exchange: primary
Routing Key: message.foo
Properties:
{'app_id': '',
'cluster_id': '',
'content_encoding': '',
'content_type': '',
'correlation_id': '',
'delivery_mode': None,
'expiration': '',
'headers': {'x-death': [{'exchange': 'timeout',
'queue': 'corner',
'reason': 'expired',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=16, tm_wday=2, tm_yday=169, tm_isdst=0)},
{'exchange': 'primary',
'queue': 'work',
'reason': 'rejected',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=0, tm_wday=2, tm_yday=169, tm_isdst=0)}]},
'message_id': 'f424ea1f-9395-450d-8d11-c56d249e33f2',
'message_type': '',
'priority': None,
'reply_to': '',
'timestamp': datetime.datetime(2014, 6, 18, 10, 13),
'user_id': ''}
Body:
'e9fe58d8-b294-43cc-addb-01733da9544f'
New Message @ 2014-06-18T10:13:16.071736
Exchange: primary
Routing Key: message.foo
Properties:
{'app_id': '',
'cluster_id': '',
'content_encoding': '',
'content_type': '',
'correlation_id': '',
'delivery_mode': None,
'expiration': '',
'headers': {'x-death': [{'exchange': 'timeout',
'queue': 'corner',
'reason': 'expired',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=16, tm_wday=2, tm_yday=169, tm_isdst=0)},
{'exchange': 'primary',
'queue': 'work',
'reason': 'rejected',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=0, tm_wday=2, tm_yday=169, tm_isdst=0)}]},
'message_id': '00c3220a-f444-4734-be78-29e229b13acc',
'message_type': '',
'priority': None,
'reply_to': '',
'timestamp': datetime.datetime(2014, 6, 18, 10, 13),
'user_id': ''}
Body:
'3c78b05b-deef-42da-9fdc-e73d0cdb92a3'
New Message @ 2014-06-18T10:13:16.073459
Exchange: primary
Routing Key: message.foo
Properties:
{'app_id': '',
'cluster_id': '',
'content_encoding': '',
'content_type': '',
'correlation_id': '',
'delivery_mode': None,
'expiration': '',
'headers': {'x-death': [{'exchange': 'timeout',
'queue': 'corner',
'reason': 'expired',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=16, tm_wday=2, tm_yday=169, tm_isdst=0)},
{'exchange': 'primary',
'queue': 'work',
'reason': 'rejected',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=0, tm_wday=2, tm_yday=169, tm_isdst=0)}]},
'message_id': '68d41564-88aa-4883-9c17-5394cebd2053',
'message_type': '',
'priority': None,
'reply_to': '',
'timestamp': datetime.datetime(2014, 6, 18, 10, 13),
'user_id': ''}
Body:
'f9baf318-8861-49fa-9d11-b6a70fd11714'
New Message @ 2014-06-18T10:13:16.075250
Exchange: primary
Routing Key: message.foo
Properties:
{'app_id': '',
'cluster_id': '',
'content_encoding': '',
'content_type': '',
'correlation_id': '',
'delivery_mode': None,
'expiration': '',
'headers': {'x-death': [{'exchange': 'timeout',
'queue': 'corner',
'reason': 'expired',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=16, tm_wday=2, tm_yday=169, tm_isdst=0)},
{'exchange': 'primary',
'queue': 'work',
'reason': 'rejected',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=0, tm_wday=2, tm_yday=169, tm_isdst=0)}]},
'message_id': '28ccba1b-7bd4-4bf0-8eae-b27c567bd336',
'message_type': '',
'priority': None,
'reply_to': '',
'timestamp': datetime.datetime(2014, 6, 18, 10, 13),
'user_id': ''}
Body:
'60b7ce7f-6c2e-4ecd-a7cc-e25e1bc572c0'
New Message @ 2014-06-18T10:13:31.156668
Exchange: primary
Routing Key: message.foo
Properties:
{'app_id': '',
'cluster_id': '',
'content_encoding': '',
'content_type': '',
'correlation_id': '',
'delivery_mode': None,
'expiration': '',
'headers': {'x-death': [{'exchange': 'timeout',
'queue': 'corner',
'reason': 'expired',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=31, tm_wday=2, tm_yday=169, tm_isdst=0)},
{'exchange': 'primary',
'queue': 'work',
'reason': 'rejected',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=16, tm_wday=2, tm_yday=169, tm_isdst=0)},
{'exchange': 'timeout',
'queue': 'corner',
'reason': 'expired',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=16, tm_wday=2, tm_yday=169, tm_isdst=0)},
{'exchange': 'primary',
'queue': 'work',
'reason': 'rejected',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=0, tm_wday=2, tm_yday=169, tm_isdst=0)}]},
'message_id': '273e7536-a7e5-4325-80c6-de2113609049',
'message_type': '',
'priority': None,
'reply_to': '',
'timestamp': datetime.datetime(2014, 6, 18, 10, 13),
'user_id': ''}
Body:
'8c9929e7-bd2a-4905-b6ed-c5694eee180d'
New Message @ 2014-06-18T10:13:31.160539
Exchange: primary
Routing Key: message.foo
Properties:
{'app_id': '',
'cluster_id': '',
'content_encoding': '',
'content_type': '',
'correlation_id': '',
'delivery_mode': None,
'expiration': '',
'headers': {'x-death': [{'exchange': 'timeout',
'queue': 'corner',
'reason': 'expired',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=31, tm_wday=2, tm_yday=169, tm_isdst=0)},
{'exchange': 'primary',
'queue': 'work',
'reason': 'rejected',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=16, tm_wday=2, tm_yday=169, tm_isdst=0)},
{'exchange': 'timeout',
'queue': 'corner',
'reason': 'expired',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=16, tm_wday=2, tm_yday=169, tm_isdst=0)},
{'exchange': 'primary',
'queue': 'work',
'reason': 'rejected',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=0, tm_wday=2, tm_yday=169, tm_isdst=0)}]},
'message_id': 'f424ea1f-9395-450d-8d11-c56d249e33f2',
'message_type': '',
'priority': None,
'reply_to': '',
'timestamp': datetime.datetime(2014, 6, 18, 10, 13),
'user_id': ''}
Body:
'e9fe58d8-b294-43cc-addb-01733da9544f'
New Message @ 2014-06-18T10:13:31.162951
Exchange: primary
Routing Key: message.foo
Properties:
{'app_id': '',
'cluster_id': '',
'content_encoding': '',
'content_type': '',
'correlation_id': '',
'delivery_mode': None,
'expiration': '',
'headers': {'x-death': [{'exchange': 'timeout',
'queue': 'corner',
'reason': 'expired',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=31, tm_wday=2, tm_yday=169, tm_isdst=0)},
{'exchange': 'primary',
'queue': 'work',
'reason': 'rejected',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=16, tm_wday=2, tm_yday=169, tm_isdst=0)},
{'exchange': 'timeout',
'queue': 'corner',
'reason': 'expired',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=16, tm_wday=2, tm_yday=169, tm_isdst=0)},
{'exchange': 'primary',
'queue': 'work',
'reason': 'rejected',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=0, tm_wday=2, tm_yday=169, tm_isdst=0)}]},
'message_id': '00c3220a-f444-4734-be78-29e229b13acc',
'message_type': '',
'priority': None,
'reply_to': '',
'timestamp': datetime.datetime(2014, 6, 18, 10, 13),
'user_id': ''}
Body:
'3c78b05b-deef-42da-9fdc-e73d0cdb92a3'
New Message @ 2014-06-18T10:13:31.165186
Exchange: primary
Routing Key: message.foo
Properties:
{'app_id': '',
'cluster_id': '',
'content_encoding': '',
'content_type': '',
'correlation_id': '',
'delivery_mode': None,
'expiration': '',
'headers': {'x-death': [{'exchange': 'timeout',
'queue': 'corner',
'reason': 'expired',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=31, tm_wday=2, tm_yday=169, tm_isdst=0)},
{'exchange': 'primary',
'queue': 'work',
'reason': 'rejected',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=16, tm_wday=2, tm_yday=169, tm_isdst=0)},
{'exchange': 'timeout',
'queue': 'corner',
'reason': 'expired',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=16, tm_wday=2, tm_yday=169, tm_isdst=0)},
{'exchange': 'primary',
'queue': 'work',
'reason': 'rejected',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=0, tm_wday=2, tm_yday=169, tm_isdst=0)}]},
'message_id': '68d41564-88aa-4883-9c17-5394cebd2053',
'message_type': '',
'priority': None,
'reply_to': '',
'timestamp': datetime.datetime(2014, 6, 18, 10, 13),
'user_id': ''}
Body:
'f9baf318-8861-49fa-9d11-b6a70fd11714'
New Message @ 2014-06-18T10:13:31.167696
Exchange: primary
Routing Key: message.foo
Properties:
{'app_id': '',
'cluster_id': '',
'content_encoding': '',
'content_type': '',
'correlation_id': '',
'delivery_mode': None,
'expiration': '',
'headers': {'x-death': [{'exchange': 'timeout',
'queue': 'corner',
'reason': 'expired',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=31, tm_wday=2, tm_yday=169, tm_isdst=0)},
{'exchange': 'primary',
'queue': 'work',
'reason': 'rejected',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=16, tm_wday=2, tm_yday=169, tm_isdst=0)},
{'exchange': 'timeout',
'queue': 'corner',
'reason': 'expired',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=16, tm_wday=2, tm_yday=169, tm_isdst=0)},
{'exchange': 'primary',
'queue': 'work',
'reason': 'rejected',
'routing-keys': ['message.foo'],
'time': time.struct_time(tm_year=2014, tm_mon=6, tm_mday=18, tm_hour=14, tm_min=13, tm_sec=0, tm_wday=2, tm_yday=169, tm_isdst=0)}]},
'message_id': '28ccba1b-7bd4-4bf0-8eae-b27c567bd336',
'message_type': '',
'priority': None,
'reply_to': '',
'timestamp': datetime.datetime(2014, 6, 18, 10, 13),
'user_id': ''}
Body:
'60b7ce7f-6c2e-4ecd-a7cc-e25e1bc572c0'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment