Skip to content

Instantly share code, notes, and snippets.

@andreiavram
Created February 15, 2017 13:54
Show Gist options
  • Save andreiavram/eedd9b43eab6cddd5974337954fd754a to your computer and use it in GitHub Desktop.
Save andreiavram/eedd9b43eab6cddd5974337954fd754a to your computer and use it in GitHub Desktop.
# every day I'm sovelling - script to move all messages from all queues
# to a different rabbitmq server using dynamic shovels
# needs at least rabbitmq 3.3.0 to work, that's when dynamic shovels were added
import requests
import json
class RabbitMQError(Exception):
pass
class RabbitMQApiQueue:
def __init__(self, *args, **kwargs):
for k, v in kwargs.items():
if not k.startswith("_"):
setattr(self, k, v)
class RabbitMQApi:
server = ""
port = ""
user = ""
password = ""
vhost = ""
def __init__(self, server, user, password, vhost, web_port=15672, amqp_port=5672, *args, **kwargs):
self.server = server
self.web_port = web_port
self.amqp_port = amqp_port
self.user = user
self.password = password
self.vhost = vhost
self._queues = []
@property
def api_base(self):
return "http://{}:{}@{}:{}/api".format(self.user, self.password, self.server, self.web_port)
@property
def amqp_conn_string(self):
return "amqp://{}:{}@{}:{}/{}".format(self.user, self.password, self.server, self.amqp_port, self.vhost)
def get_urls(self, action, **kwargs):
url = self.api_base
config = {
"list_queues": "/queues/{vhost}",
"create_shovel": "/parameters/shovel/{vhost}/{shovel_name}"
}
kwargs['vhost'] = self.vhost
url = url + config.get(action).format(**kwargs)
return url
def get_queues(self):
r = requests.get(self.get_urls("list_queues"))
if r.status_code != 200:
raise RabbitMQError("Something went wrong! Response {}, message {}".format(r.status_code, r.json()))
queue_list = [RabbitMQApiQueue(**q) for q in r.json()]
return queue_list
def create_shovel(self, source_queue, destination_rabbitmq, destination_queue=None, shovel_name=None):
if not shovel_name:
shovel_name = source_queue.name
destination_queue_name = destination_queue.name if destination_queue else source_queue.name
url = self.get_urls("create_shovel", shovel_name=shovel_name)
data = {"value": {
"src-uri": self.amqp_conn_string,
"src-queue": source_queue.name,
"dest-uri": destination_rabbitmq.amqp_conn_string,
"dest-queue": destination_queue_name
}}
r = requests.put(url, json=data)
if r.status_code != 204:
raise RabbitMQError("Shovel {} status {}: {}".format(shovel_name, r.status_code, r.text))
else:
print "Shovel {} status {} CREATED".format(shovel_name, r.status_code)
def shovel_all(self, destination_rabbitmq):
self._queues = self.get_queues()
print "TOTAL {} queues".format(len(self._queues))
for q in self._queues:
if not q.auto_delete:
self.create_shovel(q, destination_rabbitmq)
if __name__ == "__main__":
# src = RabbitMQApi("172.17.0.2", "guest", "guest", "test")
# dst = RabbitMQApi("172.17.0.3", "guest", "guest", "test2")
src.shovel_all(dst)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment