Skip to content

Instantly share code, notes, and snippets.

@rajeshl
Last active October 29, 2016 01:59
Show Gist options
  • Save rajeshl/ec9650dc53f106bd44c23b2b6b5418d5 to your computer and use it in GitHub Desktop.
Save rajeshl/ec9650dc53f106bd44c23b2b6b5418d5 to your computer and use it in GitHub Desktop.
A simple tool to migrate messages from a given AMQP Q to a different Q, with throttling. Shovel works great for bulk copy. This tool is useful when there is a need to control the speed at which migration happens.
#!/usr/bin/env python
import pika
import sys
import signal
import time
import datetime
import argparse
#TODO cleanup at signal handler
def signal_handler(signum, frame):
log("Received signal. Cleaning up..")
log("Over and out!")
exit()
def log(message):
print (datetime.datetime.now().strftime("%Y-%b-%d %H:%M:%S")) + " " + message
def setup_connection(mq_url, mq_vhost, batch_size):
if mq_vhost == '/':
mq_vhost = "%2F"
amq_url = mq_url + '/' + mq_vhost
log("Setting up AMQ connection. url: " + amq_url)
parameters = pika.URLParameters(amq_url)
try:
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.basic_qos(prefetch_count=batch_size)
except (pika.exceptions.ConnectionClosed, pika.exceptions.ProbableAuthenticationError) as e:
log("Error connecting - " + amq_url)
return None, None
return connection, channel
#TODO use two channels?
def migrate_messages(channel, mq_src_queue_name, mq_dst_exchange, mq_routing_key,
throttle_interval, batch_size, max_msg):
messages_migrated = 0
ts = datetime.datetime.now().strftime("%Y-%b-%d-%H-%M-%S")
file_handle = open("migrated_messages-" + ts + ".dat", 'w')
try:
for method_frame, properties, body in channel.consume(mq_src_queue_name):
if mq_routing_key is None:
mq_routing_key = method_frame.routing_key
channel.basic_publish(exchange=mq_dst_exchange,
routing_key=mq_routing_key,
body=body)
file_handle.write(body)
file_handle.write("\n")
channel.basic_ack(method_frame.delivery_tag)
messages_migrated += 1
if messages_migrated % batch_size == 0:
log("Migrated " + str(messages_migrated) + " messages.")
if messages_migrated >= max_msg:
break
if messages_migrated % batch_size == 0:
time.sleep( throttle_interval )
except pika.exceptions.ChannelClosed: #TODO - catch invalid exchange exception
log("Unable to migrate. Check exchange and queue exist before attempting migration!")
file_handle.close()
return messages_migrated
def setup_arg_parser():
parser = argparse.ArgumentParser()
parser.add_argument("-u", "--url", default="amqp://guest:guest@localhost",
help="AMQP url of the form amqp://<user>:<password>@<host>[:<port>]")
parser.add_argument("-v", "--vhost", default='/', help="AMQP host. example: /")
parser.add_argument("-q", "--in_queue", required=True,
help="Q from where to pull the messages.")
parser.add_argument("-x", "--out_exchange", required=True,
help="Exchange to push the messages.")
parser.add_argument("-k", "--routing_key",
help="AMQP routing key. By default the key from original message will be used.")
parser.add_argument("-t", "--throttle_interval", default=1,
help="Throttle interval in seconds. Default is 1sec.")
parser.add_argument("-b", "--batch_size", default=5,
help="Message batch size. Default is 5.")
parser.add_argument("-m", "--max_messages", default=25,
help="Total number of messages to be migrated. Default is 25.")
return parser
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
parser = setup_arg_parser()
args = parser.parse_args()
connection, channel = setup_connection(args.url, args.vhost, int(args.batch_size))
if connection is None:
exit()
log("Starting migration...")
log("From: " + args.in_queue)
log("To: " + args.out_exchange + "[" + str(args.routing_key) + "]")
log("Throttling at " + str(int(args.batch_size)/int(args.throttle_interval)) + "mps")
start_time = time.time()
messages_migrated = migrate_messages(channel, args.in_queue, args.out_exchange,
args.routing_key, int(args.throttle_interval), int(args.batch_size), int(args.max_messages))
try:
channel.close()
connection.close()
except pika.exceptions.ChannelClosed:
exit() #ignore as this could happen only because of invalid params
log("Done!")
end_time = time.time()
log("Migrated " + str(messages_migrated) + " messages in " + str(end_time - start_time) + " seconds.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment