Skip to content

Instantly share code, notes, and snippets.

@igreenfield
Last active July 12, 2017 09:30
Show Gist options
  • Save igreenfield/d5c3162c150cff924b4a1d10e4a3c371 to your computer and use it in GitHub Desktop.
Save igreenfield/d5c3162c150cff924b4a1d10e4a3c371 to your computer and use it in GitHub Desktop.
Script to rebalance master node for rabbitmq cluster
import httplib
import json
from string import Template
import time
request_template = '{"vhost": "/","name": "$name","pattern": "$pattern","apply-to": "all","definition": {"ha-mode": "nodes","ha-params": ["$node_name"],"ha-sync-mode": "automatic", "queue-master-locator": "min-masters"},"priority": 10}'
post_headers = {"Content-type": "application/json",
"Authorization": "Basic YWRtaW46YWRtaW4="}
get_headers = {"Authorization": "Basic YWRtaW46YWRtaW4="}
rabbitmq_ip = "10.10.41.56"
conn = httplib.HTTPConnection(rabbitmq_ip, 15672)
conn.request("GET", "/api/nodes", '', get_headers)
res = conn.getresponse()
nodes = None
queues = None
if res.status is 200:
data = res.read()
response_as_json = json.loads(data)
nodes = [item['name'] for item in response_as_json]
print 'found nodes: ' + str(nodes)
conn.close()
if nodes is not None:
conn = httplib.HTTPConnection(rabbitmq_ip, 15672)
conn.request("GET", "/api/queues", '', get_headers)
res = conn.getresponse()
if res.status is 200:
data = res.read()
response_as_json = json.loads(data)
queues = [item['name'] for item in response_as_json]
print 'found queues: ' + str(queues)
conn.close()
uris = []
queueSpan = []
if queues is not None:
index = 0
for queue in queues:
conn = httplib.HTTPConnection(rabbitmq_ip, 15672)
node_name = nodes[index]
request_body = Template(request_template).substitute(name=queue, pattern=queue, node_name=node_name)
uri = "/api/policies/%2F/" + queue
queueSpan.append(queue + " " + node_name)
uris.append(uri)
conn.request("PUT", uri, request_body, post_headers)
res = conn.getresponse()
conn.close()
index += 1
if index is len(nodes):
index = 0
print "Waiting for new span to take affect..."
queueSpanSet = set(queueSpan)
count = 0
while True and count < 60:
conn = httplib.HTTPConnection(rabbitmq_ip, 15672)
conn.request("GET", "/api/queues", '', get_headers)
res = conn.getresponse()
if res.status is 200:
data = res.read()
response_as_json = json.loads(data)
currentQueuesSpan = set([item['name'] + " " + item['node'] for item in response_as_json])
if currentQueuesSpan == queueSpanSet:
break
else:
time.sleep(5)
count = count + 1
print "Going to delete policies..."
for uri in uris:
conn = httplib.HTTPConnection(rabbitmq_ip, 15672)
print "DELETE " + uri
conn.request("DELETE", uri, '', get_headers)
res = conn.getresponse()
conn.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment