Created
May 3, 2023 08:12
-
-
Save abompard/4675431a7d2cf236f39f95f0369c7273 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Author: abompard | |
# https://pagure.io/fedora-infrastructure/issue/10899 | |
# | |
# This scripts requires the shovel plugin to be enabled: | |
# Check with: | |
# # rabbitmq-plugins is_enabled rabbitmq_shovel | |
# Enable with: | |
# # rabbitmq-plugins enable rabbitmq_shovel | |
import os | |
import json | |
import logging | |
import time | |
from urllib.parse import quote_plus | |
from configparser import ConfigParser | |
from argparse import ArgumentParser | |
import requests | |
CONFIG_PATH = "~/.rabbitmqadmin.conf" | |
EMPTY_TIMEOUT = 60 | |
log = logging.getLogger("recreate-queue") | |
def shovel_name(queue_from, queue_to): | |
return f"{queue_from}-to-{queue_to}" | |
class API: | |
def __init__(self, config): | |
self.config = config | |
self.http = requests.Session() | |
self.http.auth = (config["username"], config["password"]) | |
def url(self, path): | |
if not isinstance(path, list): | |
path = [path] | |
return f"http://{self.config['hostname']}:{self.config['port']}/api/{path[0]}/{quote_plus(self.config['vhost'])}/{'/'.join(path[1:])}" | |
def list(self, resource): | |
response = self.http.get(self.url(resource)) | |
return response.json() | |
def get_queue(self, name): | |
response = self.http.get(self.url(["queues", name])) | |
return response.json() | |
def get_queue_bindings(self, name): | |
response = self.http.get(self.url(["queues", name, "bindings"])) | |
# Don't return the default direct binding | |
return [b for b in response.json() if b["source"]] | |
def create_queue(self, name, params): | |
response = self.http.put( | |
self.url(["queues", name]), json.dumps(params or {}) | |
) | |
response.raise_for_status() | |
def wait_empty(self, name): | |
for i in range(EMPTY_TIMEOUT): | |
queue = self.get_queue(name) | |
if queue.get("messages") == 0: | |
return | |
time.sleep(1) | |
raise RuntimeError(f"Queue did not empty in {EMPTY_TIMEOUT}s") | |
def delete_queue(self, name): | |
response = self.http.delete( | |
self.url(["queues", name]) + "?if-empty=true" | |
) | |
response.raise_for_status() | |
def bind(self, queue, exchange, routing_key, arguments=None): | |
response = self.http.post( | |
self.url(["bindings", "e", exchange, "q", queue]), | |
json.dumps({"routing_key": routing_key, "arguments": arguments or {}}) | |
) | |
response.raise_for_status() | |
def unbind(self, queue, exchange, properties_key): | |
response = self.http.delete( | |
self.url(["bindings", "e", exchange, "q", queue, properties_key]) | |
) | |
response.raise_for_status() | |
def shovel(self, queue_from, queue_to): | |
config = { | |
"src-protocol": "amqp091", | |
"src-uri": f"amqp://{self.config['username']}:{self.config['password']}@localhost", | |
"src-queue": queue_from, | |
"dest-protocol": "amqp091", | |
"dest-uri": f"amqp://{self.config['username']}:{self.config['password']}@localhost", | |
"dest-queue": queue_to, | |
"src-delete-after": "queue-length", | |
} | |
response = self.http.put( | |
self.url(["parameters/shovel", shovel_name(queue_from, queue_to)]), | |
json={"value": config} | |
) | |
response.raise_for_status() | |
def get_config(config_path, section): | |
parser = ConfigParser() | |
parser.read(os.path.expanduser(config_path)) | |
return dict(parser.items(section)) | |
def parse_args(): | |
parser = ArgumentParser() | |
parser.add_argument("-c", "--config", default=CONFIG_PATH, help="path to rabbitmqadmin.conf (default: %(default)r)") | |
parser.add_argument("-s", "--server", default="default", help="server section name in rabbitmqadmin.conf (default: %(default)r)") | |
parser.add_argument("-a", "--arg", action="append", help="additional queue arguments") | |
parser.add_argument("name", help="The queue name to recreate") | |
return parser.parse_args() | |
def get_new_args(queue, arguments): | |
new_args = queue["arguments"].copy() | |
for argument in arguments or []: | |
arg_key, arg_value = argument.split("=") | |
try: | |
arg_value = int(arg_value) | |
except ValueError: | |
pass | |
new_args[arg_key] = arg_value | |
return new_args | |
def main(): | |
args = parse_args() | |
config = get_config(args.config, args.server) | |
client = API(config) | |
logging.basicConfig(level=logging.INFO, format="%(message)s") | |
tmp_queue_name = f"{args.name}-tmp" | |
queue = client.get_queue(args.name) | |
queue_params = {n: queue[n] for n in ("auto_delete", "durable", "exclusive")} | |
queue_params["arguments"] = get_new_args(queue, args.arg) | |
log.info(f"Creating {tmp_queue_name} with params {queue_params}") | |
client.create_queue(tmp_queue_name, queue_params) | |
bindings = client.get_queue_bindings(args.name) | |
for binding in bindings: | |
log.info(f"Binding {tmp_queue_name} to {binding['source']} on topic {binding['routing_key']}") | |
client.bind(tmp_queue_name, binding["source"], binding["routing_key"], binding["arguments"]) | |
for binding in bindings: | |
log.info(f"Unbinding {binding['destination']} from {binding['source']} on topic {binding['routing_key']}") | |
client.unbind(binding["destination"], binding["source"], binding["properties_key"]) | |
log.info(f"Waiting for {args.name} to empty...") | |
client.wait_empty(args.name) | |
log.info(f"Deleting {args.name}") | |
client.delete_queue(args.name) | |
log.info(f"Recreating {args.name} with params {queue_params}") | |
client.create_queue(args.name, queue_params) | |
for binding in bindings: | |
log.info(f"Binding {args.name} to {binding['source']} on topic {binding['routing_key']}") | |
client.bind(args.name, binding["source"], binding["routing_key"], binding["arguments"]) | |
for binding in bindings: | |
log.info(f"Unbinding {tmp_queue_name} from {binding['source']} on topic {binding['routing_key']}") | |
client.unbind(tmp_queue_name, binding["source"], binding["properties_key"]) | |
log.info(f"Moving messages in {tmp_queue_name} to {args.name}") | |
client.shovel(tmp_queue_name, args.name) | |
# Wait for temp queue messages to have been moved | |
client.wait_empty(tmp_queue_name) | |
log.info(f"Deleting {tmp_queue_name}") | |
client.delete_queue(tmp_queue_name) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment