Skip to content

Instantly share code, notes, and snippets.

@ichernev
Created August 5, 2012 22:15
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ichernev/3267415 to your computer and use it in GitHub Desktop.
Save ichernev/3267415 to your computer and use it in GitHub Desktop.
Node.js zmq guide examples
// Least-recently used (LRU) queue device in Node.js
var startClientsAndWorkersNowait = true
, terminate = 'skip'; // 'immediately', 'later', 'skip'
var numberOfClients = 10
, numberOfWorkers = 3;
var zmq = require('zmq')
, frontend = zmq.socket('router')
, backend = zmq.socket('router')
, emitter = new (require('events').EventEmitter)();
var randString = function() {
var len = 10
, charSet = '0123456789abcdef'
, result = [];
for (var i = 0; i < len; ++i) {
result.push(charSet[Math.floor(Math.random() * charSet.length)]);
}
result.splice(len / 2, 0, ['-']);
return result.join('');
};
var finishedClients = 0;
var clientTask = function() {
var client = zmq.socket('req');
client.identity = randString();
client.connect('ipc://frontend.ipc');
client.send('HELLO');
client.on('message', function(reply) {
console.log('Client:', reply.toString());
finishedClients += 1;
if (finishedClients === numberOfClients) {
emitter.emit('cleanup');
if (terminate === 'immediately') {
process.exit(0);
} else if (terminate === 'later') {
setTimeout(function() {
process.exit(0);
}, 200);
} else {
// process should terminate by itself
}
}
});
emitter.on('cleanup', function() { client.close(); });
};
var workerTask = function() {
var worker = zmq.socket('req');
worker.identity = randString();
worker.connect('ipc://backend.ipc');
worker.send('READY');
worker.on('message', function(client, empty, request) {
console.log('Worker:', request.toString());
worker.send([client, empty, 'OK']);
});
emitter.on('cleanup', function() { worker.close(); });
};
var availableWorkers = [];
// There is no buffering mechanism for received messages, so store unprocessed
// request in an array
var pendingRequests = [];
var processRequest = function() {
// Take LRU worker and give it oldest pending request
var worker = availableWorkers.shift()
, task = pendingRequests.shift();
backend.send([worker, '', task[0], '', task[1]]);
};
frontend.on('message', function(client, empty, request) {
pendingRequests.push([client, request]);
if (availableWorkers.length > 0) {
processRequest();
}
});
backend.on('message', function(worker, empty1, client, empty2, response) {
availableWorkers.push(worker);
if (client.toString() !== 'READY') {
frontend.send([client, empty2, response]);
}
if (pendingRequests.length > 0) {
processRequest();
}
});
frontend.on('error', function(err) {
console.log('fe err', err.stack);
});
backend.on('error', function(err) {
console.log('be err', err.stack);
});
var binded = 0;
var checkStartOther = function() {
binded += 1;
if (binded === 2) {
if (!startClientsAndWorkersNowait) {
startOther();
}
}
};
frontend.bind('ipc://frontend.ipc', checkStartOther);
backend.bind('ipc://backend.ipc', checkStartOther);
emitter.setMaxListeners(numberOfWorkers + numberOfClients + 1);
emitter.on('cleanup', function() {
frontend.close();
backend.close();
});
var startOther = function() {
var i;
for (i = 0; i < numberOfClients; ++i) { clientTask(); }
for (i = 0; i < numberOfWorkers; ++i) { workerTask(); }
};
if (startClientsAndWorkersNowait) {
startOther();
}
// Custom routing Router to Mama (ROUTER to REQ) in Node.js
var numberOfWorkers = 10;
var zmq = require('zmq')
, client = zmq.socket('router')
, workloadSent = 0
, endSent = 0;
var randString = function() {
var len = 10
, charSet = '0123456789abcdef'
, result = [];
for (var i = 0; i < len; ++i) {
result.push(charSet[Math.floor(Math.random() * charSet.length)]);
}
result.splice(len / 2, 0, ['-']);
return result.join('');
};
var workerTask = function() {
var worker = zmq.socket('req')
, total = 0;
worker.identity = randString();
worker.connect('ipc://routing.ipc');
worker.send('ready');
worker.on('message', function(data) {
if (data.toString() === 'END') {
console.log('Processed:', total, 'tasks');
worker.close();
return;
}
total += 1;
// Simulate random work
setTimeout(function() {
worker.send('ready');
}, Math.random() * 1000 + 1);
});
process.on('SIGINT', worker.close.bind(worker));
};
// Prevent 'memory leak' message caused by process.on('SIGINT', ...);
process.setMaxListeners(numberOfWorkers + 1);
for (var i = 0; i < numberOfWorkers; ++i) {
workerTask();
}
client.on('message', function(address, empty, ready) {
// LRU worker is next waiting in queue
if (workloadSent >= numberOfWorkers * 10) {
client.send([address, empty, 'END']);
endSent += 1;
if (endSent === numberOfWorkers) {
client.close();
}
} else {
client.send([address, empty, 'This is the workload']);
workloadSent += 1;
}
});
client.bind('ipc://routing.ipc');
process.on('SIGINT', function() {
client.close();
process.exit(0);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment