Last active
August 29, 2015 14:09
-
-
Save pjwerneck/b0040e81e3bdfc98216e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import sys | |
import os | |
import argparse | |
import urlparse | |
import json | |
import pickle | |
from Queue import Empty | |
import kombu | |
# used as type for --src and --dst | |
def _queue_from_uri(uri): | |
uri, queue = uri.rsplit('/', 1) | |
broker = kombu.Connection(uri) | |
return broker.SimpleQueue(queue) | |
# used as type for serializers | |
def _get_serializer(name): | |
if name == 'json': | |
return json.dumps | |
elif name == 'pickle': | |
return _pickle_dump | |
def _pickle_load(obj): | |
return pickle.loads(obj.decode('base64')) | |
# used as type for deserializers | |
def _get_deserializer(name): | |
if name == 'json': | |
return json.loads | |
elif name == 'pickle': | |
return _pickle_load | |
def _pickle_dump(obj): | |
return pickle.dumps(obj).encode('base64') | |
def cmd_dump(src, n=None, outfile=None, serializer=None, noack=False, **opts): | |
outfile = sys.stdout if outfile is None else outfile | |
with outfile as f: | |
while n: | |
try: | |
msg = src.get_nowait() | |
except Empty: | |
break | |
if not noack: | |
msg.ack() | |
n -= 1 | |
f.write(serializer(msg.payload)) | |
f.write(os.linesep) | |
f.flush() | |
def cmd_load(dst, infile, serializer=None, **opts): | |
with infile as f: | |
for line in infile: | |
msg = serializer(line) | |
dst.put(msg) | |
def cmd_move(src, dst, n=None, **opts): | |
while n: | |
try: | |
msg = src.get_nowait() | |
except Empty: | |
break | |
dst.put(msg.payload) | |
msg.ack() | |
n -= 1 | |
def cmd_copy(src, dst, n=None, **opts): | |
while n: | |
try: | |
msg = src.get_nowait() | |
except Empty: | |
break | |
dst.put(msg.payload) | |
n -= 1 | |
def _setup_argparse(): | |
parser = argparse.ArgumentParser(description="RabbitMQ Tools") | |
subparsers = parser.add_subparsers(dest="subparser_name") | |
# dump | |
p_dump = subparsers.add_parser('dump', help='Dump messages from source queue to output') | |
p_dump.set_defaults(func=cmd_dump) | |
p_dump.add_argument('--src', type=_queue_from_uri, help='Broker URI for source queue', required=True) | |
p_dump.add_argument('-s', '--serializer', help='Serializer format to use.', | |
type=_get_serializer, | |
choices=['json', 'pickle'], default='json') | |
p_dump.add_argument('-n', type=int, help='Number of messages to dump', | |
default=float('inf')) | |
p_dump.add_argument('-o', '--outfile', type=argparse.FileType('ab')) | |
p_dump.add_argument('--noack', help="Don't ack messages", | |
action='store_true', default=False) | |
# load | |
p_load = subparsers.add_parser('load', help='Load messages to dest queue') | |
p_load.set_defaults(func=cmd_load) | |
p_load.add_argument('--dst', type=_queue_from_uri, help='Broker URI for dest queue', required=True) | |
p_load.add_argument('-s', '--serializer', help='Serializer format to use.', | |
type=_get_deserializer, | |
choices=['json', 'pickle'], default='json') | |
p_load.add_argument('-i', '--infile', type=argparse.FileType('r'), | |
required=True) | |
# move | |
p_move = subparsers.add_parser('move', help='Move messages from src to dst queue') | |
p_move.set_defaults(func=cmd_move) | |
p_move.add_argument('--src', type=_queue_from_uri, help='Broker URI for source queue', required=True) | |
p_move.add_argument('--dst', type=_queue_from_uri, help='Broker URI for dest queue', required=True) | |
p_move.add_argument('-n', type=int, help='Number of messages to move', | |
default=float('inf')) | |
# copy | |
p_copy = subparsers.add_parser('copy', help='Copy messages from src to dst queue') | |
p_copy.set_defaults(func=cmd_copy) | |
p_copy.add_argument('--src', type=_queue_from_uri, help='Broker URI for source queue', required=True) | |
p_copy.add_argument('--dst', type=_queue_from_uri, help='Broker URI for dest queue', required=True) | |
p_copy.add_argument('-n', type=int, help='Number of messages to copy', | |
default=float('inf')) | |
return parser | |
def main(): | |
parser = _setup_argparse() | |
args = parser.parse_args() | |
args.func(**vars(args)) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment