Created
October 16, 2012 00:15
-
-
Save calvinfo/3896519 to your computer and use it in GitHub Desktop.
Purgatory
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var _ = require('underscore'), | |
rabbit = require('rabbit-pool'), | |
logging = require('logging')(module); | |
var exchange, | |
handlers = {}, | |
messages = []; | |
/** | |
* Initializes the exchange and client. | |
* @param {Object} options | |
* @param {Function} callback | |
*/ | |
exports.init = function (options, callback) { | |
if (_.isFunction(options)) { | |
callback = options; | |
options = {}; | |
} | |
rabbit.get(function (client) { | |
client.exchange('purgatory', function (result) { | |
exchange = result; | |
_.each(messages, function (item) { | |
publish(item.key, item.message); | |
}); | |
messages = []; | |
if (callback) { | |
callback(null, result); | |
} | |
}); | |
}); | |
}; | |
/** | |
* Publishes a message on the purgatory exchange | |
* @param {String} key [description] | |
* @param {Object} message [description] | |
*/ | |
var publish = exports.publish = function (key, message) { | |
logging.info('Publishing message', { key : key, | |
message : message }); | |
if (!exchange) { | |
logging.info('No exchange, queueing in memory'); | |
return messages.push({ key : key, | |
message : message }); | |
} | |
exchange.publish(key, message); | |
}; | |
/** | |
* Subscribes the handler to the particular key. Will replace an existing | |
* handler if one already exists. The handler should accept a message object | |
* along with a callback indicating whether the message could be fully | |
* processed. If there is still a connection error, the error should | |
* be passed to the handler callback. | |
* | |
* @param {String} key | |
* @param {Function} handler ({ message : message, | |
* headers : headers, | |
* deliveryInfo : deliveryInfo }, | |
* callback (err) }); | |
*/ | |
exports.subscribe = function (key, handler) { | |
var currentHandler = handlers[key]; | |
handlers[key] = handler; | |
// If we are already bound and have set the new handler, return | |
if (currentHandler) | |
return; | |
// Otherwise set up the queue | |
rabbit.get(function (client) { | |
declareQueue(client, key); | |
}); | |
}; | |
/** | |
* Re publishes a message into the exchange, and sets a new timeout to check | |
* whether the queued messages can be re-processed. | |
* @param {Object} queue | |
* @param {String} key | |
* @param {Object} message | |
*/ | |
var reque = function (queue, key, message) { | |
logging.info('Couldn\'t make a connection, putting message back in queue.', | |
{ message : message, | |
key : key }); | |
// Re-publish our message. | |
publish(key, message); | |
// Wait for the timeout until shifting | |
setTimeout(function () { | |
logging.info('Trying again to ack for key', { key : key }); | |
queue.shift(); | |
}, 10000); | |
}; | |
/** | |
* Declares a queue using the given key | |
* @param {Object} client rabbit connection/client | |
* @param {String} key | |
*/ | |
var declareQueue = function (client, key) { | |
logging.info('Declaring queue', { key : key }); | |
// Create our queue | |
client.queue(queueName(key), function (queue) { | |
queue.bind('purgatory', key); | |
queue.subscribe({ ack : true }, | |
function (message, headers, deliveryInfo) { | |
var delivery = { message : message, | |
headers : headers, | |
deliveryInfo : deliveryInfo }; | |
// Every message delivery look up handler fn. | |
// This way we can change it at will. | |
var handlerFn = handlers[key]; | |
handlerFn(delivery, function (err, result) { | |
if (err) { | |
process.nextTick(function () { | |
reque(queue, key, message); }); | |
} else { | |
process.nextTick(function () { queue.shift(); }); | |
} | |
}); | |
}); | |
}); | |
}; | |
/** | |
* Returns the routing key from the given arguments. | |
* @return {String} key | |
*/ | |
var queueName = function () { | |
var args = _.toArray(arguments); | |
args.unshift('purgatory'); | |
return args.join(':'); | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment