Python CLI application that converts all quorum queues on a RabbitMQ cluster to classic queues with the intent of not loosing any messages or impacting production workloads (aside from disconnecting consumers on a queue)
#!/usr/bin/env python3 | |
import argparse | |
import json | |
import logging | |
import sys | |
import time | |
import typing | |
from urllib import parse | |
import httpx | |
LOGGER = logging.getLogger(__name__) | |
class Converter: | |
def __init__(self, url: str, vhost: str) -> None: | |
self.base_url = url.rstrip('/') | |
self.vhost = parse.quote(vhost, '') | |
response = httpx.get('{}/api/queues/{}'.format( | |
self.base_url, self.vhost)) | |
self.queues = sorted((queue for queue in response.json() | |
if queue['type'] == 'quorum' | |
and not queue['auto_delete']), | |
key=lambda queue: queue['name']) | |
def run(self) -> None: | |
for queue in self.queues: | |
LOGGER.info('Processing %s', queue['name']) | |
temp_queue = '{}-temp'.format(queue['name']) | |
if not self.create_queue( | |
temp_queue, | |
self._strip_x_queue_type(queue['arguments']), | |
durable=queue['durable'], | |
exclusive=queue['exclusive']): | |
LOGGER.error('Failed to create temporary queue: %s', | |
temp_queue) | |
sys.exit(1) | |
self.switch_bindings(queue['name'], temp_queue) | |
self.wait_for_queue_to_drain(queue['name']) | |
if not self.delete_queue(queue['name']): | |
LOGGER.error('Failed to delete queue: %s', queue['name']) | |
sys.exit(1) | |
if not self.create_queue( | |
queue['name'], | |
self._strip_x_queue_type(queue['arguments']), | |
durable=queue['durable'], | |
exclusive=queue['exclusive']): | |
LOGGER.error('Failed to create queue: %s', queue['name']) | |
sys.exit(1) | |
self.switch_bindings(temp_queue, queue['name']) | |
if not self.create_temp_shovel(temp_queue, queue['name']): | |
LOGGER.error('Failed to create temp shovel from %s to %s', | |
temp_queue, queue['name']) | |
sys.exit(1) | |
self.wait_for_queue_to_drain(temp_queue) | |
if not self.delete_queue(temp_queue): | |
LOGGER.error('Failed to delete queue: %s', temp_queue) | |
sys.exit(1) | |
def bind_queue(self, | |
exchange: str, | |
routing_key: str, | |
queue: str, | |
arguments: dict) -> bool: | |
url = '{}/api/bindings/{}/e/{}/q/{}'.format( | |
self.base_url, self.vhost, exchange, queue) | |
response = httpx.post( | |
url, headers={'Content-Type': 'application/json'}, | |
data=json.dumps({'routing_key': routing_key, | |
'arguments': arguments})) | |
return response.status_code == 201 | |
def create_queue(self, | |
name: str, | |
arguments: dict, | |
durable: bool, | |
exclusive: bool) -> bool: | |
response = httpx.put(self._api_url_queue(name), | |
headers={'Content-Type': 'application/json'}, | |
data=json.dumps({ | |
'arguments': arguments, | |
'auto_delete': False, | |
'durable': durable, | |
'exclusive': exclusive, | |
'type': 'classic'})) | |
LOGGER.debug('Response: %r', response) | |
return response.status_code == 201 | |
def create_temp_shovel(self, from_queue: str, to_queue: str) -> bool: | |
shovel_name = '{}-to-{}'.format(from_queue, to_queue) | |
response = httpx.put( | |
'{}/api/parameters/shovel/{}/{}'.format( | |
self.base_url, self.vhost, shovel_name), | |
headers={'Content-Type': 'application/json'}, | |
data=json.dumps({ | |
'component': 'shovel', | |
'name': shovel_name, | |
'value': { | |
'ack-mode': 'on-confirm', | |
'add-forward-headers': False, | |
'delete-after': 'queue-length', | |
'dest-queue': to_queue, | |
'dest-uri': 'amqp://', | |
'prefetch-count': 100, | |
'reconnect-delay': 30, | |
'src-queue': from_queue, | |
'src-uri': 'amqp://' | |
}, | |
'vhost': self.vhost})) | |
if response.status_code != 201: | |
LOGGER.error('Response: %r: %r', response, response.json()) | |
return response.status_code == 201 | |
def delete_queue(self, name: str) -> bool: | |
response = httpx.delete(self._api_url_queue(name)) | |
LOGGER.debug('Response: %r', response) | |
return response.status_code == 204 | |
def get_queue_bindings(self, name: str) -> typing.List[typing.Dict]: | |
response = httpx.get( | |
'{}/api/queues/{}/{}/bindings'.format( | |
self.base_url, self.vhost, name)) | |
return [binding for binding in response.json() | |
if binding['destination'] != binding['properties_key'] | |
and binding['destination'] != binding['routing_key'] | |
and binding['source'] != ''] | |
def switch_bindings(self, from_queue: str, to_queue: str) -> None: | |
for binding in self.get_queue_bindings(from_queue): | |
if not self.bind_queue( | |
binding['source'], | |
binding['routing_key'], | |
to_queue, | |
binding['arguments']): | |
LOGGER.error('Failed to bind queue %s to %s with %s', | |
to_queue, binding['source'], | |
binding['routing_key']) | |
sys.exit(1) | |
if not self.unbind_queue( | |
binding['source'], | |
from_queue, | |
binding['properties_key']): | |
LOGGER.error('Failed to unbind queue %s to %s with %s', | |
from_queue, binding['source'], | |
binding['routing_key']) | |
sys.exit(1) | |
def unbind_queue(self, | |
exchange: str, | |
queue: str, | |
properties_key: str) -> bool: | |
url = '{}/api/bindings/{}/e/{}/q/{}/{}'.format( | |
self.base_url, self.vhost, exchange, queue, properties_key) | |
response = httpx.delete(url) | |
return response.status_code == 204 | |
def wait_for_queue_to_drain(self, name: str) -> None: | |
while True: | |
response = httpx.get(self._api_url_queue(name)) | |
if not response.status_code == 200: | |
LOGGER.error('Failed to get queue details', name) | |
sys.exit(1) | |
body = response.json() | |
if body['messages'] == 0: | |
return | |
LOGGER.info('Queue %s has %i messages', name, body['messages']) | |
time.sleep(5) | |
def _api_url_queue(self, name: str) -> str: | |
return '{}/api/queues/{}/{}'.format(self.base_url, self.vhost, name) | |
@staticmethod | |
def _strip_x_queue_type(args: dict) -> dict: | |
if 'x-queue-type' in args: | |
del args['x-queue-type'] | |
return args | |
def main() -> None: | |
parser = argparse.ArgumentParser( | |
description='Convert Quorum queues to Classic queues', | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
parser.add_argument('--vhost', default='/') | |
parser.add_argument( | |
'url', metavar='URL', nargs='?', | |
default='http://guest:guest@localhost:15672', | |
help='The Base URL to the RabbitMQ Management UI, ' | |
'including credentials') | |
args = parser.parse_args() | |
logging.basicConfig(level=logging.INFO) | |
Converter(args.url, args.vhost).run() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment