Created
August 5, 2012 22:15
-
-
Save ichernev/3267415 to your computer and use it in GitHub Desktop.
Node.js zmq guide examples
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Least-recently used (LRU) queue device in Node.js | |
var startClientsAndWorkersNowait = true | |
, terminate = 'skip'; // 'immediately', 'later', 'skip' | |
var numberOfClients = 10 | |
, numberOfWorkers = 3; | |
var zmq = require('zmq') | |
, frontend = zmq.socket('router') | |
, backend = zmq.socket('router') | |
, emitter = new (require('events').EventEmitter)(); | |
var randString = function() { | |
var len = 10 | |
, charSet = '0123456789abcdef' | |
, result = []; | |
for (var i = 0; i < len; ++i) { | |
result.push(charSet[Math.floor(Math.random() * charSet.length)]); | |
} | |
result.splice(len / 2, 0, ['-']); | |
return result.join(''); | |
}; | |
var finishedClients = 0; | |
var clientTask = function() { | |
var client = zmq.socket('req'); | |
client.identity = randString(); | |
client.connect('ipc://frontend.ipc'); | |
client.send('HELLO'); | |
client.on('message', function(reply) { | |
console.log('Client:', reply.toString()); | |
finishedClients += 1; | |
if (finishedClients === numberOfClients) { | |
emitter.emit('cleanup'); | |
if (terminate === 'immediately') { | |
process.exit(0); | |
} else if (terminate === 'later') { | |
setTimeout(function() { | |
process.exit(0); | |
}, 200); | |
} else { | |
// process should terminate by itself | |
} | |
} | |
}); | |
emitter.on('cleanup', function() { client.close(); }); | |
}; | |
var workerTask = function() { | |
var worker = zmq.socket('req'); | |
worker.identity = randString(); | |
worker.connect('ipc://backend.ipc'); | |
worker.send('READY'); | |
worker.on('message', function(client, empty, request) { | |
console.log('Worker:', request.toString()); | |
worker.send([client, empty, 'OK']); | |
}); | |
emitter.on('cleanup', function() { worker.close(); }); | |
}; | |
var availableWorkers = []; | |
// There is no buffering mechanism for received messages, so store unprocessed | |
// request in an array | |
var pendingRequests = []; | |
var processRequest = function() { | |
// Take LRU worker and give it oldest pending request | |
var worker = availableWorkers.shift() | |
, task = pendingRequests.shift(); | |
backend.send([worker, '', task[0], '', task[1]]); | |
}; | |
frontend.on('message', function(client, empty, request) { | |
pendingRequests.push([client, request]); | |
if (availableWorkers.length > 0) { | |
processRequest(); | |
} | |
}); | |
backend.on('message', function(worker, empty1, client, empty2, response) { | |
availableWorkers.push(worker); | |
if (client.toString() !== 'READY') { | |
frontend.send([client, empty2, response]); | |
} | |
if (pendingRequests.length > 0) { | |
processRequest(); | |
} | |
}); | |
frontend.on('error', function(err) { | |
console.log('fe err', err.stack); | |
}); | |
backend.on('error', function(err) { | |
console.log('be err', err.stack); | |
}); | |
var binded = 0; | |
var checkStartOther = function() { | |
binded += 1; | |
if (binded === 2) { | |
if (!startClientsAndWorkersNowait) { | |
startOther(); | |
} | |
} | |
}; | |
frontend.bind('ipc://frontend.ipc', checkStartOther); | |
backend.bind('ipc://backend.ipc', checkStartOther); | |
emitter.setMaxListeners(numberOfWorkers + numberOfClients + 1); | |
emitter.on('cleanup', function() { | |
frontend.close(); | |
backend.close(); | |
}); | |
var startOther = function() { | |
var i; | |
for (i = 0; i < numberOfClients; ++i) { clientTask(); } | |
for (i = 0; i < numberOfWorkers; ++i) { workerTask(); } | |
}; | |
if (startClientsAndWorkersNowait) { | |
startOther(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Custom routing Router to Mama (ROUTER to REQ) in Node.js | |
var numberOfWorkers = 10; | |
var zmq = require('zmq') | |
, client = zmq.socket('router') | |
, workloadSent = 0 | |
, endSent = 0; | |
var randString = function() { | |
var len = 10 | |
, charSet = '0123456789abcdef' | |
, result = []; | |
for (var i = 0; i < len; ++i) { | |
result.push(charSet[Math.floor(Math.random() * charSet.length)]); | |
} | |
result.splice(len / 2, 0, ['-']); | |
return result.join(''); | |
}; | |
var workerTask = function() { | |
var worker = zmq.socket('req') | |
, total = 0; | |
worker.identity = randString(); | |
worker.connect('ipc://routing.ipc'); | |
worker.send('ready'); | |
worker.on('message', function(data) { | |
if (data.toString() === 'END') { | |
console.log('Processed:', total, 'tasks'); | |
worker.close(); | |
return; | |
} | |
total += 1; | |
// Simulate random work | |
setTimeout(function() { | |
worker.send('ready'); | |
}, Math.random() * 1000 + 1); | |
}); | |
process.on('SIGINT', worker.close.bind(worker)); | |
}; | |
// Prevent 'memory leak' message caused by process.on('SIGINT', ...); | |
process.setMaxListeners(numberOfWorkers + 1); | |
for (var i = 0; i < numberOfWorkers; ++i) { | |
workerTask(); | |
} | |
client.on('message', function(address, empty, ready) { | |
// LRU worker is next waiting in queue | |
if (workloadSent >= numberOfWorkers * 10) { | |
client.send([address, empty, 'END']); | |
endSent += 1; | |
if (endSent === numberOfWorkers) { | |
client.close(); | |
} | |
} else { | |
client.send([address, empty, 'This is the workload']); | |
workloadSent += 1; | |
} | |
}); | |
client.bind('ipc://routing.ipc'); | |
process.on('SIGINT', function() { | |
client.close(); | |
process.exit(0); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment