Skip to content

Instantly share code, notes, and snippets.

@amakhrov
Created July 21, 2014 17:41
Show Gist options
  • Save amakhrov/67eed46516a30999d85b to your computer and use it in GitHub Desktop.
Save amakhrov/67eed46516a30999d85b to your computer and use it in GitHub Desktop.
Example usage of node-amqp
module.exports = function (nconf) {
var amqp = require('amqp'),
winston = require('winston');
var config = nconf.get('amqp');
var externalMessageCallback;
var openQueue = function (conn, queueName, passive, cb) {
var queueOptions = {
passive: passive,
durable: true,
autoDelete: false
};
conn.queue(queueName, queueOptions, function (queue) {
cb(null, queue);
}).once('error', function (err) {
cb(err);
});
};
var getQueue = function (conn, queueName, cb) {
openQueue(conn, queueName, true, function (err, queue) {
if (!err) {
cb(null, queue);
} else if (err.code === 404) {
winston.info('Queue ' + queueName + ' doesn\'t exist - will create it now');
openQueue(conn, queueName, false, cb);
} else {
cb (err);
}
});
};
var onQueueMessage = function (queue, msg) {
externalMessageCallback && externalMessageCallback(msg)
queue.shift();
};
var conn = amqp.createConnection({
url: config.url
});
conn.on('ready', function () {
var queueName = config.queueName;
getQueue(conn, queueName, function (err, queue) {
if (err) {
winston.error('Error while opening a queue ' + queueName, err);
return;
}
winston.debug('Queue ' + queueName + ' is open');
queue.subscribe({
ack: true,
prefetchCount: 1
}, onQueueMessage.bind(null, queue));
});
});
conn.on('error', function (err) {
switch (err.code) {
case 'ECONNREFUSED':
winston.error('Problem connecting to RabbitMQ server ' + config.url, err);
break;
default:
winston.error('Unclassified RabbitMQ error', err);
}
});
return {
start: function (onMessage) {
externalMessageCallback = onMessage;
}
};
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment