Skip to content

Instantly share code, notes, and snippets.

@simontabor
Created February 19, 2015 14:53
Show Gist options
  • Save simontabor/a72dada672fe240ad00d to your computer and use it in GitHub Desktop.
Save simontabor/a72dada672fe240ad00d to your computer and use it in GitHub Desktop.
Redis Cluster slot balancing
var async = require('async');
var Redis = require('redis');
/*
* Rebalance a Redis cluster
*
* 1. Keeps slot allocations in blocks to maintain tidy configuration.
* 2. Moves slots to the coldest node from the hottest node, so the cluster stays healthy whilst rebalancing (`liveBalance`)
*/
var Rebalance = function(redisConf, removeNodes, liveBalance) {
var self = this;
self.numSlots = 16384;
self.redisConn = Redis.createClient(redisConf.port, redisConf.host);
self.liveBalance = liveBalance;
// we want to remove all slots off of these nodes (empty to spread across all masters)
self.removeNodes = removeNodes || [];
};
Rebalance.prototype.run = function(cb) {
var self = this;
self.getNodes(function(err) {
if (err) return cb(err);
var jobs = self.calculateMoves();
var slotMoves;
if (self.liveBalance) {
// ordered by number of slots for even rebalancing (doesn't work)
slotMoves = self.getOrderedSlotMoves(jobs);
} else {
// batched into node groups (works)
slotMoves = self.getSlotMoves(jobs);
}
console.log(slotMoves);
async.eachSeries(slotMoves, self.moveSlot.bind(self), cb);
});
};
// list nodes in the cluster and connect to them
Rebalance.prototype.getNodes = function(cb) {
var self = this;
self.redisConn.send_command('cluster', [ 'nodes' ], function(err, resp) {
if (err) return cb(err);
self.nodes = {};
resp.split('\n').forEach(function(n){
var s = n.split(' ');
// don't care about slaves, only master
if (!s[2] || s[2].indexOf('master') === -1) return;
// [ '0-400', '6546-7423', '9642' ] => [ [ 0, 400 ], [ 6546, 7423 ], [ 9642, 9642 ] ]
var slots = s.slice(8).map(function(s) {
var split = s.split('-').map(Number);
if (split.length === 1) return [ split[0], split[0] ];
return [ split[0], split[1] ];
});
self.nodes[s[1]] = {
name: s[1],
hash: s[0],
client: Redis.createClient(s[1].split(':')[1], s[1].split(':')[0]),
slots: slots,
numSlots: slots.reduce(function(a, b) { return a + b[1] - b[0] + 1; }, 0)
};
});
cb(null, self.nodes);
});
};
// calculate what slots we're going to be moving
Rebalance.prototype.calculateMoves = function() {
var self = this;
// get the nodenames in the cluster which aren't going to be removed
var availableNodes = Object.keys(self.nodes).filter(function(n) {
return self.removeNodes.indexOf(n) === -1;
});
var slotsPerNode = self.numSlots / availableNodes.length;
// these are the block slot ranges that we want to end up with
var slotRanges = [];
for (var i = 0; i < availableNodes.length; i++) {
slotRanges.push( [ Math.round(slotsPerNode * i), Math.round(slotsPerNode * (i + 1)) - 1 ] );
}
// what node currently owns what slots?
var slotMap = new Array(self.numSlots);
for (var i in self.nodes) {
var n = self.nodes[i];
for (var j = 0; j < n.slots.length; j++) {
var sl = n.slots[j];
for (var h = sl[0]; h <= sl[1]; h++) {
slotMap[h] = i;
}
}
}
// nodes we've already selected to move a slot range to
var selectedNodes = [];
// jobs of moving slots from one node to another
var jobs = [];
for (var i = 0; i < slotRanges.length; i++) {
var slotRange = slotRanges[i];
var nodeSlots = self.getSlotRangeNodes(slotRange, slotMap);
var mostSlots = self.getNodeWithMostSlots(nodeSlots, selectedNodes);
// we've selected this node now, stop if from being selected again
selectedNodes.push(mostSlots);
for (var j in nodeSlots) {
// we dont want to move slots between the same node
if (j === mostSlots) continue;
// no slots to move between these nodes
if (!nodeSlots[j].length) continue;
jobs.push({
from: j,
to: mostSlots,
slots: nodeSlots[j]
});
}
}
return jobs;
};
// get what nodes own what slots within a slot range
Rebalance.prototype.getSlotRangeNodes = function(range, slotMap) {
var self = this;
var nodes = {};
// initialise an empty array for each of the nodes in the cluster
for (var i in self.nodes) nodes[i] = [];
for (var i = range[0]; i <= range[1]; i++) {
// get the host which currently owns this slot
var h = slotMap[i];
if (!h) throw new Error('Unallocated slot ' + i + ', I\'m not sure what to do');
nodes[h].push(i);
}
return nodes;
};
// sort the slotRangeNodes by the number of slots that they own and select the one with the most
Rebalance.prototype.getNodeWithMostSlots = function(nodeSlots, exclude) {
var self = this;
return Object.keys(nodeSlots).filter(function(a) {
// exclude nodes we're removing, and ones that we've already selected before
return self.removeNodes.indexOf(a) === -1 && exclude.indexOf(a) === -1;
}).sort(function(a, b) {
return nodeSlots[b].length - nodeSlots[a].length;
})[0];
};
// returns the slot movement jobs ordered so we keep the cluster balanced as we go
Rebalance.prototype.getOrderedSlotMoves = function(jobs) {
var self = this;
var slotMoves = [];
var allTo = {};
for (var i = 0; i < jobs.length; i++) {
var job = jobs[i];
if (!allTo[job.to]) allTo[job.to] = [];
for (var j = 0; j < job.slots.length; j++) {
allTo[job.to].push({
from: job.from,
slot: job.slots[j]
});
}
}
var findSlotToMove = function() {
var node = Object.keys(allTo).filter(function(a) {
// ensure we still have slots to move
return allTo[a].length;
}).sort(function(a, b) {
// find node with least total slots
return self.nodes[a].numSlots - self.nodes[b].numSlots;
})[0];
// exit recursion
if (!node) return;
var nodeJob = allTo[node].sort(function(a, b) {
// sort the nodeJobs to find the node with the MOST slots to move away from
return self.nodes[a.from].numSlots - self.nodes[b.from].numSlots
}).pop();
// update slot numbers so next sort will be up to date
self.nodes[node].numSlots++;
self.nodes[nodeJob.from].numSlots--;
slotMoves.push({
from: nodeJob.from,
to: node,
slot: nodeJob.slot
});
// call recursively until !node
findSlotToMove();
};
findSlotToMove();
return slotMoves;
};
// returns the slot movement jobs, un-ordered (therefore batched into from->to blocks)
Rebalance.prototype.getSlotMoves = function(jobs) {
var self = this;
var slotMoves = [];
for (var i = 0; i < jobs.length; i++) {
var job = jobs[i];
for (var j = 0; j < job.slots.length; j++) {
slotMoves.push({
from: job.from,
to: job.to,
slot: job.slots[j]
});
}
}
return slotMoves;
};
// move a slot from one node to another
Rebalance.prototype.moveSlot = function(job, cb) {
var self = this;
var from = job.from;
var to = job.to;
var slot = job.slot;
var target = self.nodes[to];
var source = self.nodes[from];
console.log('MOVING SLOT ' + slot + ' from ' + from + ' to ' + to);
// tell the relevant nodes that we're moving
async.parallel([
function(done) { target.client.send_command('cluster', [ 'setslot', slot, 'importing', source.hash ], done); },
function(done) { source.client.send_command('cluster', [ 'setslot', slot, 'migrating', target.hash ], done); },
], function(err) {
if (err) return cb(err);
self.moveKeysInSlot(job, function(err) {
if (err) return cb(err);
// we've moved all keys out of the slot!
// let's tell the nodes we've moved
async.each(Object.keys(self.nodes), function(n, done) {
self.nodes[n].client.send_command('cluster', [ 'setslot', slot, 'node', target.hash ], done);
}, cb);
});
});
};
// move all the keys in a slot, callback when all keys moved
Rebalance.prototype.moveKeysInSlot = function(job, cb) {
var self = this;
var source = self.nodes[job.from];
source.client.send_command('cluster', [ 'getkeysinslot', job.slot, 20 ], function(err, keys) {
if (err) return cb(err);
// done!
if (!keys.length) return cb();
async.eachLimit(keys, 1, function(key, done) {
console.log('Moving ' + key + ' from ' + job.from + ' to ' + job.to);
source.client.send_command('migrate', [ job.to.split(':')[0], job.to.split(':')[1], key, 0, 15000 ], done);
}, function(err) {
if (err) return cb(err);
// call again to do the next batch
self.moveKeysInSlot(job, cb);
});
});
};
// // block rebalance, removing a node from the cluster (works)
var r = new Rebalance({
port: 6380,
host: '11.11.12.70'
}, [ '11.11.12.71:6380' ], false);
// // block rebalance, sharing evenly across all nodes in the cluster (works)
// var r = new Rebalance({
// port: 6380,
// host: '11.11.12.70'
// }, [ ], false);
// // live/dynamic rebalance, removing a node from the cluster (doesn't work)
// var r = new Rebalance({
// port: 6380,
// host: '11.11.12.70'
// }, [ '11.11.12.71:6380' ], true);
// // live/dynamic rebalance, sharing evenly across all nodes in the cluster (doesn't work)
// var r = new Rebalance({
// port: 6380,
// host: '11.11.12.70'
// }, [ ], true);
r.run(function(err) {
if (err) console.error(err, 'Rebalance FAILED!');
console.log('Done');
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment