Skip to content

Instantly share code, notes, and snippets.

@bboe
Created October 17, 2014 20:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bboe/3d4129f7f6d55185b36d to your computer and use it in GitHub Desktop.
Save bboe/3d4129f7f6d55185b36d to your computer and use it in GitHub Desktop.
Tool to interact with rabbitmq queues. Depends on docopt and pika (pip install docopt pika).
#!/usr/bin/env python
"""Usage: rabbit_admin [options] delete QUEUE
rabbit_admin [options] add QUEUE <KEY:VALUE>...
rabbit_admin [options] dump QUEUE
-D --not-durable don't use a durable queue
-h --help show this
-H --host HOST rabbitmq host [default: localhost]
"""
import json
import pika
import sys
from docopt import docopt
def main():
args = docopt(__doc__, version='Rabbit Admin 0.1')
# Establish connection
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=args['--host']))
channel = connection.channel()
if args['delete']:
try:
channel.queue_delete(queue=args['QUEUE'])
print('Deleted {}'.format(args['QUEUE']))
except pika.exceptions.AMQPChannelError as exc:
print('Queue not found')
elif args['add']:
json_data = json.dumps(dict(x.split(':') for x in args['<KEY:VALUE>']))
channel.queue_declare(queue=args['QUEUE'],
durable=not args['--not-durable'])
channel.basic_publish(exchange='', body=json_data,
routing_key=args['QUEUE'],
properties=pika.BasicProperties(delivery_mode=2))
elif args['dump']:
empty = False
while not empty:
method, header, body = channel.basic_get(queue=args['QUEUE'])
if method.name == 'Basic.GetEmpty':
empty = True
else:
print(body)
if not args['--not-durable']:
channel.basic_ack(delivery_tag=method.delivery_tag)
return 0
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment