Skip to content

Instantly share code, notes, and snippets.

@calvinfo
Created October 16, 2012 00:15
Show Gist options
  • Save calvinfo/3896519 to your computer and use it in GitHub Desktop.
Save calvinfo/3896519 to your computer and use it in GitHub Desktop.
Purgatory
var _ = require('underscore'),
rabbit = require('rabbit-pool'),
logging = require('logging')(module);
var exchange,
handlers = {},
messages = [];
/**
* Initializes the exchange and client.
* @param {Object} options
* @param {Function} callback
*/
exports.init = function (options, callback) {
if (_.isFunction(options)) {
callback = options;
options = {};
}
rabbit.get(function (client) {
client.exchange('purgatory', function (result) {
exchange = result;
_.each(messages, function (item) {
publish(item.key, item.message);
});
messages = [];
if (callback) {
callback(null, result);
}
});
});
};
/**
* Publishes a message on the purgatory exchange
* @param {String} key [description]
* @param {Object} message [description]
*/
var publish = exports.publish = function (key, message) {
logging.info('Publishing message', { key : key,
message : message });
if (!exchange) {
logging.info('No exchange, queueing in memory');
return messages.push({ key : key,
message : message });
}
exchange.publish(key, message);
};
/**
* Subscribes the handler to the particular key. Will replace an existing
* handler if one already exists. The handler should accept a message object
* along with a callback indicating whether the message could be fully
* processed. If there is still a connection error, the error should
* be passed to the handler callback.
*
* @param {String} key
* @param {Function} handler ({ message : message,
* headers : headers,
* deliveryInfo : deliveryInfo },
* callback (err) });
*/
exports.subscribe = function (key, handler) {
var currentHandler = handlers[key];
handlers[key] = handler;
// If we are already bound and have set the new handler, return
if (currentHandler)
return;
// Otherwise set up the queue
rabbit.get(function (client) {
declareQueue(client, key);
});
};
/**
* Re publishes a message into the exchange, and sets a new timeout to check
* whether the queued messages can be re-processed.
* @param {Object} queue
* @param {String} key
* @param {Object} message
*/
var reque = function (queue, key, message) {
logging.info('Couldn\'t make a connection, putting message back in queue.',
{ message : message,
key : key });
// Re-publish our message.
publish(key, message);
// Wait for the timeout until shifting
setTimeout(function () {
logging.info('Trying again to ack for key', { key : key });
queue.shift();
}, 10000);
};
/**
* Declares a queue using the given key
* @param {Object} client rabbit connection/client
* @param {String} key
*/
var declareQueue = function (client, key) {
logging.info('Declaring queue', { key : key });
// Create our queue
client.queue(queueName(key), function (queue) {
queue.bind('purgatory', key);
queue.subscribe({ ack : true },
function (message, headers, deliveryInfo) {
var delivery = { message : message,
headers : headers,
deliveryInfo : deliveryInfo };
// Every message delivery look up handler fn.
// This way we can change it at will.
var handlerFn = handlers[key];
handlerFn(delivery, function (err, result) {
if (err) {
process.nextTick(function () {
reque(queue, key, message); });
} else {
process.nextTick(function () { queue.shift(); });
}
});
});
});
};
/**
* Returns the routing key from the given arguments.
* @return {String} key
*/
var queueName = function () {
var args = _.toArray(arguments);
args.unshift('purgatory');
return args.join(':');
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment