Skip to content

Instantly share code, notes, and snippets.

@pjwerneck
Last active August 29, 2015 14:09
Show Gist options
  • Save pjwerneck/b0040e81e3bdfc98216e to your computer and use it in GitHub Desktop.
Save pjwerneck/b0040e81e3bdfc98216e to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
import sys
import os
import argparse
import urlparse
import json
import pickle
from Queue import Empty
import kombu
# used as type for --src and --dst
def _queue_from_uri(uri):
uri, queue = uri.rsplit('/', 1)
broker = kombu.Connection(uri)
return broker.SimpleQueue(queue)
# used as type for serializers
def _get_serializer(name):
if name == 'json':
return json.dumps
elif name == 'pickle':
return _pickle_dump
def _pickle_load(obj):
return pickle.loads(obj.decode('base64'))
# used as type for deserializers
def _get_deserializer(name):
if name == 'json':
return json.loads
elif name == 'pickle':
return _pickle_load
def _pickle_dump(obj):
return pickle.dumps(obj).encode('base64')
def cmd_dump(src, n=None, outfile=None, serializer=None, noack=False, **opts):
outfile = sys.stdout if outfile is None else outfile
with outfile as f:
while n:
try:
msg = src.get_nowait()
except Empty:
break
if not noack:
msg.ack()
n -= 1
f.write(serializer(msg.payload))
f.write(os.linesep)
f.flush()
def cmd_load(dst, infile, serializer=None, **opts):
with infile as f:
for line in infile:
msg = serializer(line)
dst.put(msg)
def cmd_move(src, dst, n=None, **opts):
while n:
try:
msg = src.get_nowait()
except Empty:
break
dst.put(msg.payload)
msg.ack()
n -= 1
def cmd_copy(src, dst, n=None, **opts):
while n:
try:
msg = src.get_nowait()
except Empty:
break
dst.put(msg.payload)
n -= 1
def _setup_argparse():
parser = argparse.ArgumentParser(description="RabbitMQ Tools")
subparsers = parser.add_subparsers(dest="subparser_name")
# dump
p_dump = subparsers.add_parser('dump', help='Dump messages from source queue to output')
p_dump.set_defaults(func=cmd_dump)
p_dump.add_argument('--src', type=_queue_from_uri, help='Broker URI for source queue', required=True)
p_dump.add_argument('-s', '--serializer', help='Serializer format to use.',
type=_get_serializer,
choices=['json', 'pickle'], default='json')
p_dump.add_argument('-n', type=int, help='Number of messages to dump',
default=float('inf'))
p_dump.add_argument('-o', '--outfile', type=argparse.FileType('ab'))
p_dump.add_argument('--noack', help="Don't ack messages",
action='store_true', default=False)
# load
p_load = subparsers.add_parser('load', help='Load messages to dest queue')
p_load.set_defaults(func=cmd_load)
p_load.add_argument('--dst', type=_queue_from_uri, help='Broker URI for dest queue', required=True)
p_load.add_argument('-s', '--serializer', help='Serializer format to use.',
type=_get_deserializer,
choices=['json', 'pickle'], default='json')
p_load.add_argument('-i', '--infile', type=argparse.FileType('r'),
required=True)
# move
p_move = subparsers.add_parser('move', help='Move messages from src to dst queue')
p_move.set_defaults(func=cmd_move)
p_move.add_argument('--src', type=_queue_from_uri, help='Broker URI for source queue', required=True)
p_move.add_argument('--dst', type=_queue_from_uri, help='Broker URI for dest queue', required=True)
p_move.add_argument('-n', type=int, help='Number of messages to move',
default=float('inf'))
# copy
p_copy = subparsers.add_parser('copy', help='Copy messages from src to dst queue')
p_copy.set_defaults(func=cmd_copy)
p_copy.add_argument('--src', type=_queue_from_uri, help='Broker URI for source queue', required=True)
p_copy.add_argument('--dst', type=_queue_from_uri, help='Broker URI for dest queue', required=True)
p_copy.add_argument('-n', type=int, help='Number of messages to copy',
default=float('inf'))
return parser
def main():
parser = _setup_argparse()
args = parser.parse_args()
args.func(**vars(args))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment