Skip to content

Instantly share code, notes, and snippets.

@vicapow
Created July 17, 2012 06:40
Show Gist options
  • Save vicapow/3127666 to your computer and use it in GitHub Desktop.
Save vicapow/3127666 to your computer and use it in GitHub Desktop.
locking with zookeeper/zkplus and node.js
var assert = require('assert')
, zookeeper = require('zkplus')
, path = require('path')
var client = zookeeper.createClient({
servers: [{
host: 'localhost'
, port: 2181
}]
});
client.on('connect', function(){
console.log('connected to zoo keeper');
function getLock(){
lock(client,'/_lockpath_',function(err,unlock){
if(err) throw err;
console.log('we got the lock!');
setTimeout(function(){
unlock(function(err){
if(err) throw err;
console.log('unlocked');
getLock();
}); // to release the lock
},5000);
//getLock(); // to get the lock again
});
}
getLock();
});
function lock(client,lockpath,cb){
// create the lock path if it doesn't already exist
client.mkdirp(lockpath,function(err){
if(err) return cb(err);
var opts = {
flags : [
// zookeeper will auto create a new "key" sequencially
'sequence'
// zookeeper will remove the znode when the client disconnects
, 'ephemeral'
]
}
client.creat(lockpath + '/', opts, function(err,path_){
if(err) return cb(err);
var node = parseInt(path.basename(path_),10);
attemptLock(node,attemptLockCB);
function attemptLockCB(err,locked){
if(err) return cb(err);
if(locked){
// the lock was obtained. return an `unlock` function
console.log('got lock for node: '+node);
return cb(null, function(cb){
client.rmr(lockpath + '/' + pad(node),cb);
});
}else{
// wait until the node infront of us changes
prev_node = lockpath + '/' + pad(node -1);
client.watch(prev_node, { method : 'data' }
, function(err, listener){
// no node found
if(err) return onError(err);
listener.on('error',onError);
listener.on('data',function(obj){
listener.stop();
return attemptLock(node,attemptLockCB);
});
function onError(err){
if(err && err.code === zookeeper.ZNONODE)
return attemptLock(node,attemptLockCB);
else if(err) return cb(err);
}
})
}
}
});
function pad(num){
var size = 10;
var s = num+"";
while (s.length < size) s = "0" + s;
return s;
}
function attemptLock(node,cb){
client.readdir(lockpath,function(err,nodes){
if(err) return cb(err);
var min = null;
nodes.forEach(function(node,i){
node = nodes[i] = parseInt(node,10);
if(min===null) min = node;
else if(node < min) min = node;
});
return cb(null,min===node);
});
}
});
}
client.on('close', function(){
console.log('connection to zoo keeper is now closed');
});
process.on('SIGINT',function(){
console.log('on process exit');
client.close();
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment