Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Python CLI application that converts all quorum queues on a RabbitMQ cluster to classic queues with the intent of not loosing any messages or impacting production workloads (aside from disconnecting consumers on a queue)
#!/usr/bin/env python3
import argparse
import json
import logging
import sys
import time
import typing
from urllib import parse
import httpx
LOGGER = logging.getLogger(__name__)
class Converter:
def __init__(self, url: str, vhost: str) -> None:
self.base_url = url.rstrip('/')
self.vhost = parse.quote(vhost, '')
response = httpx.get('{}/api/queues/{}'.format(
self.base_url, self.vhost))
self.queues = sorted((queue for queue in response.json()
if queue['type'] == 'quorum'
and not queue['auto_delete']),
key=lambda queue: queue['name'])
def run(self) -> None:
for queue in self.queues:
LOGGER.info('Processing %s', queue['name'])
temp_queue = '{}-temp'.format(queue['name'])
if not self.create_queue(
temp_queue,
self._strip_x_queue_type(queue['arguments']),
durable=queue['durable'],
exclusive=queue['exclusive']):
LOGGER.error('Failed to create temporary queue: %s',
temp_queue)
sys.exit(1)
self.switch_bindings(queue['name'], temp_queue)
self.wait_for_queue_to_drain(queue['name'])
if not self.delete_queue(queue['name']):
LOGGER.error('Failed to delete queue: %s', queue['name'])
sys.exit(1)
if not self.create_queue(
queue['name'],
self._strip_x_queue_type(queue['arguments']),
durable=queue['durable'],
exclusive=queue['exclusive']):
LOGGER.error('Failed to create queue: %s', queue['name'])
sys.exit(1)
self.switch_bindings(temp_queue, queue['name'])
if not self.create_temp_shovel(temp_queue, queue['name']):
LOGGER.error('Failed to create temp shovel from %s to %s',
temp_queue, queue['name'])
sys.exit(1)
self.wait_for_queue_to_drain(temp_queue)
if not self.delete_queue(temp_queue):
LOGGER.error('Failed to delete queue: %s', temp_queue)
sys.exit(1)
def bind_queue(self,
exchange: str,
routing_key: str,
queue: str,
arguments: dict) -> bool:
url = '{}/api/bindings/{}/e/{}/q/{}'.format(
self.base_url, self.vhost, exchange, queue)
response = httpx.post(
url, headers={'Content-Type': 'application/json'},
data=json.dumps({'routing_key': routing_key,
'arguments': arguments}))
return response.status_code == 201
def create_queue(self,
name: str,
arguments: dict,
durable: bool,
exclusive: bool) -> bool:
response = httpx.put(self._api_url_queue(name),
headers={'Content-Type': 'application/json'},
data=json.dumps({
'arguments': arguments,
'auto_delete': False,
'durable': durable,
'exclusive': exclusive,
'type': 'classic'}))
LOGGER.debug('Response: %r', response)
return response.status_code == 201
def create_temp_shovel(self, from_queue: str, to_queue: str) -> bool:
shovel_name = '{}-to-{}'.format(from_queue, to_queue)
response = httpx.put(
'{}/api/parameters/shovel/{}/{}'.format(
self.base_url, self.vhost, shovel_name),
headers={'Content-Type': 'application/json'},
data=json.dumps({
'component': 'shovel',
'name': shovel_name,
'value': {
'ack-mode': 'on-confirm',
'add-forward-headers': False,
'delete-after': 'queue-length',
'dest-queue': to_queue,
'dest-uri': 'amqp://',
'prefetch-count': 100,
'reconnect-delay': 30,
'src-queue': from_queue,
'src-uri': 'amqp://'
},
'vhost': self.vhost}))
if response.status_code != 201:
LOGGER.error('Response: %r: %r', response, response.json())
return response.status_code == 201
def delete_queue(self, name: str) -> bool:
response = httpx.delete(self._api_url_queue(name))
LOGGER.debug('Response: %r', response)
return response.status_code == 204
def get_queue_bindings(self, name: str) -> typing.List[typing.Dict]:
response = httpx.get(
'{}/api/queues/{}/{}/bindings'.format(
self.base_url, self.vhost, name))
return [binding for binding in response.json()
if binding['destination'] != binding['properties_key']
and binding['destination'] != binding['routing_key']
and binding['source'] != '']
def switch_bindings(self, from_queue: str, to_queue: str) -> None:
for binding in self.get_queue_bindings(from_queue):
if not self.bind_queue(
binding['source'],
binding['routing_key'],
to_queue,
binding['arguments']):
LOGGER.error('Failed to bind queue %s to %s with %s',
to_queue, binding['source'],
binding['routing_key'])
sys.exit(1)
if not self.unbind_queue(
binding['source'],
from_queue,
binding['properties_key']):
LOGGER.error('Failed to unbind queue %s to %s with %s',
from_queue, binding['source'],
binding['routing_key'])
sys.exit(1)
def unbind_queue(self,
exchange: str,
queue: str,
properties_key: str) -> bool:
url = '{}/api/bindings/{}/e/{}/q/{}/{}'.format(
self.base_url, self.vhost, exchange, queue, properties_key)
response = httpx.delete(url)
return response.status_code == 204
def wait_for_queue_to_drain(self, name: str) -> None:
while True:
response = httpx.get(self._api_url_queue(name))
if not response.status_code == 200:
LOGGER.error('Failed to get queue details', name)
sys.exit(1)
body = response.json()
if body['messages'] == 0:
return
LOGGER.info('Queue %s has %i messages', name, body['messages'])
time.sleep(5)
def _api_url_queue(self, name: str) -> str:
return '{}/api/queues/{}/{}'.format(self.base_url, self.vhost, name)
@staticmethod
def _strip_x_queue_type(args: dict) -> dict:
if 'x-queue-type' in args:
del args['x-queue-type']
return args
def main() -> None:
parser = argparse.ArgumentParser(
description='Convert Quorum queues to Classic queues',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--vhost', default='/')
parser.add_argument(
'url', metavar='URL', nargs='?',
default='http://guest:guest@localhost:15672',
help='The Base URL to the RabbitMQ Management UI, '
'including credentials')
args = parser.parse_args()
logging.basicConfig(level=logging.INFO)
Converter(args.url, args.vhost).run()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.