Skip to content

Instantly share code, notes, and snippets.

@SocalNick
Created May 6, 2013 00:16
Show Gist options
  • Save SocalNick/5522690 to your computer and use it in GitHub Desktop.
Save SocalNick/5522690 to your computer and use it in GitHub Desktop.
Polls IronMQ
var Poller, imq, imqClient, EventEmitter, util;
imq = require('iron_mq');
imqClient = new imq.Client();
EventEmitter = require('events').EventEmitter;
util = require('util');
Poller = function Poller(queueName) {
// setup instance vars
this.queueName = queueName;
this.queue = imqClient.queue(this.queueName);
this.reservedMessages = [];
this.numReservedMessages = 0;
this.polling = false;
// setup internal listeners
this.on('getMessage', this._getMessage);
this.on('messageReceived', this._reserveMessage);
this.on('finish', this.finish);
};
util.inherits(Poller, EventEmitter);
Poller.prototype.startPolling = function (frequency) {
if (!this.polling) {
this.queueIntervalId = setInterval((function () {
this.emit('getMessage');
}).bind(this), frequency);
process.on('SIGINT', (function () { this.emit('finish'); }).bind(this));
process.on('SIGTERM', (function () { this.emit('finish'); }).bind(this));
console.log('Polling for messages on: ' + this.queueName);
this.polling = true;
}
};
Poller.prototype._getMessage = function () {
// TODO might need to store options for get
this.queue.get({}, (function (err, message) {
if (err) {
return console.log('IronMQ GET error: ' + err);
}
if (!message) {
return; // nothing to do
}
this.emit('messageReceived', message);
}).bind(this));
};
Poller.prototype._reserveMessage = function (message) {
this.reservedMessages[message.id] = message;
this.numReservedMessages++;
console.log('Processing message: ' + message.id + ' (' + this.numReservedMessages + ')');
this.emit('messageReserved', message);
};
Poller.prototype.getNumReservedMessages = function () {
return this.numReservedMessages;
};
Poller.prototype.deleteMessage = function (messageId, callback) {
if (this.reservedMessages[messageId]) {
this.reservedMessages[messageId] = null;
this.numReservedMessages--;
}
this.queue.del(messageId, function (err, body) {
if (err) {
return callback('IronMQ DEL error: ' + err);
}
return callback(null, body);
});
};
Poller.prototype.stopPolling = function () {
if (this.queueIntervalId) {
clearInterval(this.queueIntervalId);
this.polling = false;
}
};
Poller.prototype.finish = function () {
var numLastCheck;
var poller = this;
console.log('Received SIGINT / SIGTERM');
this.stopPolling();
// Every second, check if finished
setInterval(function () {
if (poller.numReservedMessages <= 0) {
console.log('Done processing messages...exiting');
process.exit();
}
if (!numLastCheck || numLastCheck > poller.numReservedMessages) {
console.log('Waiting for ' + poller.numReservedMessages + ' message(s) to finish processing');
numLastCheck = poller.numReservedMessages;
}
}, 1000);
};
exports.Poller = Poller;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment