Skip to content

Instantly share code, notes, and snippets.

@SocalNick
Created April 24, 2013 14:43
Show Gist options
  • Save SocalNick/5452665 to your computer and use it in GitHub Desktop.
Save SocalNick/5452665 to your computer and use it in GitHub Desktop.
Some IronMQ and process control with Node.js
var iron_mq = require('iron_mq');
var imq = new iron_mq.Client();
var queueName = "query_smt";
var queue = imq.queue(queueName);
var numMessagesCurrentlyProcessing = 0;
process.on( 'SIGINT', function() {
var num;
console.log( 'Received SIGINT (Crtl-C)')
clearInterval(queueIntervalId);
setInterval(function () {
if (numMessagesCurrentlyProcessing <= 0) {
console.log('Done processing messages...exiting');
process.exit();
}
if (!num || num > numMessagesCurrentlyProcessing) {
console.log('Waiting for ' + numMessagesCurrentlyProcessing + ' message to finish processing');
num = numMessagesCurrentlyProcessing;
}
}, 1000);
});
//imq.queues({}, function (error, body) {
// console.log(error);
// console.log(body);
//});
//queue.info(function (error, body) {
// console.log(error);
// console.log(body);
//});
var getMessage = function() {
//console.log('Trying to get message from ' + queueName + '...');
queue.get({ timeout: 5 }, function (err, body) {
if (err) {
return console.log('GET error: ' + err);
}
if (body) {
return processMessage(body);
}
//console.log('No messages');
});
};
var processMessage = function (body) {
numMessagesCurrentlyProcessing++;
console.log('Processing: ' + numMessagesCurrentlyProcessing);
console.log(body);
setTimeout(function () {
//console.log('Finished processing');
numMessagesCurrentlyProcessing--;
}, Math.floor((Math.random()*20000)+1));
};
console.log('Starting queue poller...');
var queueIntervalId = setInterval(getMessage, 1000);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment