Skip to content

Instantly share code, notes, and snippets.

@abompard
Created May 3, 2023 08:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save abompard/4675431a7d2cf236f39f95f0369c7273 to your computer and use it in GitHub Desktop.
Save abompard/4675431a7d2cf236f39f95f0369c7273 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# Author: abompard
# https://pagure.io/fedora-infrastructure/issue/10899
#
# This scripts requires the shovel plugin to be enabled:
# Check with:
# # rabbitmq-plugins is_enabled rabbitmq_shovel
# Enable with:
# # rabbitmq-plugins enable rabbitmq_shovel
import os
import json
import logging
import time
from urllib.parse import quote_plus
from configparser import ConfigParser
from argparse import ArgumentParser
import requests
CONFIG_PATH = "~/.rabbitmqadmin.conf"
EMPTY_TIMEOUT = 60
log = logging.getLogger("recreate-queue")
def shovel_name(queue_from, queue_to):
return f"{queue_from}-to-{queue_to}"
class API:
def __init__(self, config):
self.config = config
self.http = requests.Session()
self.http.auth = (config["username"], config["password"])
def url(self, path):
if not isinstance(path, list):
path = [path]
return f"http://{self.config['hostname']}:{self.config['port']}/api/{path[0]}/{quote_plus(self.config['vhost'])}/{'/'.join(path[1:])}"
def list(self, resource):
response = self.http.get(self.url(resource))
return response.json()
def get_queue(self, name):
response = self.http.get(self.url(["queues", name]))
return response.json()
def get_queue_bindings(self, name):
response = self.http.get(self.url(["queues", name, "bindings"]))
# Don't return the default direct binding
return [b for b in response.json() if b["source"]]
def create_queue(self, name, params):
response = self.http.put(
self.url(["queues", name]), json.dumps(params or {})
)
response.raise_for_status()
def wait_empty(self, name):
for i in range(EMPTY_TIMEOUT):
queue = self.get_queue(name)
if queue.get("messages") == 0:
return
time.sleep(1)
raise RuntimeError(f"Queue did not empty in {EMPTY_TIMEOUT}s")
def delete_queue(self, name):
response = self.http.delete(
self.url(["queues", name]) + "?if-empty=true"
)
response.raise_for_status()
def bind(self, queue, exchange, routing_key, arguments=None):
response = self.http.post(
self.url(["bindings", "e", exchange, "q", queue]),
json.dumps({"routing_key": routing_key, "arguments": arguments or {}})
)
response.raise_for_status()
def unbind(self, queue, exchange, properties_key):
response = self.http.delete(
self.url(["bindings", "e", exchange, "q", queue, properties_key])
)
response.raise_for_status()
def shovel(self, queue_from, queue_to):
config = {
"src-protocol": "amqp091",
"src-uri": f"amqp://{self.config['username']}:{self.config['password']}@localhost",
"src-queue": queue_from,
"dest-protocol": "amqp091",
"dest-uri": f"amqp://{self.config['username']}:{self.config['password']}@localhost",
"dest-queue": queue_to,
"src-delete-after": "queue-length",
}
response = self.http.put(
self.url(["parameters/shovel", shovel_name(queue_from, queue_to)]),
json={"value": config}
)
response.raise_for_status()
def get_config(config_path, section):
parser = ConfigParser()
parser.read(os.path.expanduser(config_path))
return dict(parser.items(section))
def parse_args():
parser = ArgumentParser()
parser.add_argument("-c", "--config", default=CONFIG_PATH, help="path to rabbitmqadmin.conf (default: %(default)r)")
parser.add_argument("-s", "--server", default="default", help="server section name in rabbitmqadmin.conf (default: %(default)r)")
parser.add_argument("-a", "--arg", action="append", help="additional queue arguments")
parser.add_argument("name", help="The queue name to recreate")
return parser.parse_args()
def get_new_args(queue, arguments):
new_args = queue["arguments"].copy()
for argument in arguments or []:
arg_key, arg_value = argument.split("=")
try:
arg_value = int(arg_value)
except ValueError:
pass
new_args[arg_key] = arg_value
return new_args
def main():
args = parse_args()
config = get_config(args.config, args.server)
client = API(config)
logging.basicConfig(level=logging.INFO, format="%(message)s")
tmp_queue_name = f"{args.name}-tmp"
queue = client.get_queue(args.name)
queue_params = {n: queue[n] for n in ("auto_delete", "durable", "exclusive")}
queue_params["arguments"] = get_new_args(queue, args.arg)
log.info(f"Creating {tmp_queue_name} with params {queue_params}")
client.create_queue(tmp_queue_name, queue_params)
bindings = client.get_queue_bindings(args.name)
for binding in bindings:
log.info(f"Binding {tmp_queue_name} to {binding['source']} on topic {binding['routing_key']}")
client.bind(tmp_queue_name, binding["source"], binding["routing_key"], binding["arguments"])
for binding in bindings:
log.info(f"Unbinding {binding['destination']} from {binding['source']} on topic {binding['routing_key']}")
client.unbind(binding["destination"], binding["source"], binding["properties_key"])
log.info(f"Waiting for {args.name} to empty...")
client.wait_empty(args.name)
log.info(f"Deleting {args.name}")
client.delete_queue(args.name)
log.info(f"Recreating {args.name} with params {queue_params}")
client.create_queue(args.name, queue_params)
for binding in bindings:
log.info(f"Binding {args.name} to {binding['source']} on topic {binding['routing_key']}")
client.bind(args.name, binding["source"], binding["routing_key"], binding["arguments"])
for binding in bindings:
log.info(f"Unbinding {tmp_queue_name} from {binding['source']} on topic {binding['routing_key']}")
client.unbind(tmp_queue_name, binding["source"], binding["properties_key"])
log.info(f"Moving messages in {tmp_queue_name} to {args.name}")
client.shovel(tmp_queue_name, args.name)
# Wait for temp queue messages to have been moved
client.wait_empty(tmp_queue_name)
log.info(f"Deleting {tmp_queue_name}")
client.delete_queue(tmp_queue_name)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment