Skip to content

Instantly share code, notes, and snippets.

@varunnayal
Created July 20, 2017 15:09
Show Gist options
  • Save varunnayal/ce8b5be8a5f1f45ca841814c8e74990d to your computer and use it in GitHub Desktop.
Save varunnayal/ce8b5be8a5f1f45ca841814c8e74990d to your computer and use it in GitHub Desktop.
RabbitMQ: Process a message maximum of "N" times using dead letter exchange
var amqp = require('amqplib');
var url = 'amqp://localhost';
var WORK_QUEUE_NAME = 'work.queue';
var WORK_EXCHANGE_NAME = 'work.exchange';
var DEAD_QUEUE_NAME = 'dead.work.queue';
var DEAD_EXCHANGE_NAME = 'dead.work.exchange';
var MAX_RETRY_COUNT = 3;
// Working
//
// +-------------+ +-----------------+
// | |----- nack'ed message -----> | |
// | work.queue | | dead.work.queue |
// | |<---- expired message ------ | |
// +-------------+ +-----------------+
// |
// +-----> message moved out after N unsuccessful processing
// (using 'x-death' header)
function messageHandler(channel, msg) {
console.log('Message: ' + msg.content.toString());
// Prepare seenCount from 'x-death' header
var seenCount = 1;
if (Array.isArray(msg.properties.headers['x-death'])) {
seenCount = msg.properties.headers['x-death'].reduce((cnt, deathObj) => {
if (deathObj.queue === WORK_QUEUE_NAME) {
return cnt + deathObj.count;
}
return cnt;
}, 1);
}
// Tries exhausted, acknowledge, or take some other action
if (seenCount > MAX_RETRY_COUNT) {
console.log('Maximum retries reached...!!!')
channel.ack(msg);
} else {
channel.nack(msg, false, false); // Last argument should be false!!!
}
}
// Setup dead.work.queue in dead.work.exchange that will receive message from work.queue (after being nack'ed)
// Message will sit here for 'x-message-ttl' milliseconds before being routes to work.queue via work.exchange
function setupDeadLetter() {
return amqp.connect(url).then(function (conn) {
return conn.createChannel();
}).then(function (ch) {
return ch.assertExchange(DEAD_EXCHANGE_NAME, 'topic', { durable: true, autoDelete: false }).then(function () {
return ch.assertQueue(DEAD_QUEUE_NAME, {
autoDelete: false,
durable: true,
arguments: {
'x-dead-letter-exchange': WORK_EXCHANGE_NAME,
'x-dead-letter-routing-key': WORK_QUEUE_NAME,
'x-message-ttl': 5000,
}
});
}).then(function () {
// This Queue is interested in listening messages from WORK QUEUE only
// This DEAD Queue will receive message whose routing key is marked as DEAD_QUEUE_NAME,
// hence, in working queue we set routing key to DEAD_QUEUE_NAME
return ch.bindQueue(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, DEAD_QUEUE_NAME);
});
});
};
// Setup work.queue in work.exchange. nack'ed messages will be dead-letter'ed
// to dead.work.queue via dead.work.exchange
var setupWorker = function () {
return amqp.connect(url).then(function (conn) {
return conn.createChannel();
}).then(function (ch) {
return ch.assertExchange(WORK_EXCHANGE_NAME, 'direct', { durable: true, autoDelete: false }).then(function () {
return ch.assertQueue(WORK_QUEUE_NAME, {
autoDelete: false,
durable: true,
arguments: {
'x-dead-letter-exchange': DEAD_EXCHANGE_NAME,
'x-dead-letter-routing-key': DEAD_QUEUE_NAME,
}
});
}).then(function () {
return ch.bindQueue(WORK_QUEUE_NAME, WORK_EXCHANGE_NAME, WORK_QUEUE_NAME);
}).then(function () {
return ch.consume(WORK_QUEUE_NAME, messageHandler.bind(null, ch), { noAck: false });
}).then(function () {
// Returning to publish message
return ch;
});
});
};
console.log('- Setting up Dead Letter Exchange -');
setupDeadLetter().then(function () {
console.log('- Setting up Worker Exchage -');
return setupWorker();
}).then(function (ch) {
// Publish message
console.log('- Sending Message -');
// return ch.publish(WORK_EXCHANGE_NAME, WORK_QUEUE_NAME, new Buffer('Hitman!!!'));
return ch.sendToQueue(WORK_QUEUE_NAME, new Buffer('Time is ' + (new Date()).toISOString()));
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment