Skip to content

Instantly share code, notes, and snippets.

@tiagobnobrega
Created November 12, 2021 22:11
Show Gist options
  • Save tiagobnobrega/82f098c232d47424d3f1b6e643ef6e27 to your computer and use it in GitHub Desktop.
Save tiagobnobrega/82f098c232d47424d3f1b6e643ef6e27 to your computer and use it in GitHub Desktop.
Rabbit MQ consumer with Retry
const amqp = require('amqp-connection-manager');
/**
* * Creates an instance of a rabbitMQ manager
* @param rabbitUrl - rabbit URL connection
* @param systemName - Optional prefix used for shared topology elements (wait queues and retry dispatcher dlx)
* @param onConnect - Optional connection callback
* @param onDisconnect - Optional disconnection callback
* @returns {{sendMessage: (function(*=, *=): Promise<boolean>), consumeWithRetry: ((function(*=, *, *=): Promise<void>)|*)}}
*/
function rabbitManager(rabbitUrl,{systemName='', onConnect, onDisconnect}){
//Connect to rabbitMQ
const connection = amqp.connect([rabbitUrl]);
connection.on('connect', onConnect || (() => console.log(`Connected to ${rabbitUrl}`)));
connection.on('disconnect', onDisconnect || (err => console.log('Disconnected from rabbitMQ.', err.stack)));
//create a single channel for communication
const channel = connection.createChannel({
json: true,
});
/**
* Build the error queue name for unprocessable messages after retry
* @param queueName - Name of the main queue
* @returns {`${string}_error`} - Error queue name
*/
const getQueueErrorQueueName = (queueName) => `${queueName}_error`;
/**
* Returns a setup function for queueName to assert the topology
* Calling this with different waitIntervalSec will crete different wait queues
* @param queueName - Name of the main queue
* @param waitIntervalSec - Time in seconds the message should wait to retry
* @returns {(function(*): Promise<void>)|*} Setup function for queueName to assert the topology
*/
const setupForQueue = (queueName,waitIntervalSec)=>{
return async (ch)=>{
const queueRetryDLXName = `${queueName}_retry-dlx`;
const systemWaitQueueName = `${systemName}-wait-queue-${waitIntervalSec}`;
const retryDispatcherDLXName = `${systemName}_retry-dispatcher-dlx`;
//* routing key of the retry DLX
const queueRetryDLX_DLRoutingKey = `rt-${waitIntervalSec}.${queueName}`;
//*assert topology
await Promise.all([
//* asset queue retry-dlx
ch.assertExchange(queueRetryDLXName,"topic",
{
arguments:{
'x-dead-letter-routing-key': queueRetryDLX_DLRoutingKey,
},
// deadLetterRoutingKey:queueRetryDLX_DLRoutingKey,
durable: true,
noAck: true
}),
//* assert system-retry-dlx
ch.assertExchange(retryDispatcherDLXName,"topic",
{
durable: true,
noAck: true
}),
//* assert main queue
ch.assertQueue(queueName,
{
durable: true ,
deadLetterExchange: queueRetryDLXName, // bind queue -> retry-dlx
deadLetterRoutingKey: queueRetryDLX_DLRoutingKey,
}),
//* assert queue error queue
ch.assertQueue(getQueueErrorQueueName(queueName),
{
durable: true ,
deadLetterExchange: queueRetryDLXName // bind queue -> retry-dlx
}),
//* assert system-wait-queue
ch.assertQueue(systemWaitQueueName,
{
durable: true ,
deadLetterExchange: retryDispatcherDLXName, // bind system-wait-queue -> system-retry-dlx
messageTtl: waitIntervalSec*1_000
}),
//* binds
//* bind retry-dlx -> system-wait-queue
ch.bindQueue(systemWaitQueueName,queueRetryDLXName,`rt-${waitIntervalSec}.#`),
//* bind system-retry-dlx -> queue
ch.bindQueue(queueName,retryDispatcherDLXName,`#.${queueName}`),
])
}
}
/**
* Attempt to get death count from message. This is only available from rabbit 3.8 or above
* @param data
* @returns {((label?: string) => void)|number|((key?: (IDBValidKey | IDBKeyRange)) => IDBRequest<number>)|number|number}
*/
const getDeathCount = (data)=>{
if(!data || !data.properties || !data.properties.headers || !data.properties.headers['x-death'] ) return 0;
// console.log(`getDeathCount:: x-death: ${JSON.stringify(data.properties.headers['x-death'])}`)
try {
return data.properties.headers['x-death'][0].count || 0;
}catch (e){
console.error('rabbitManager.getDeathCount:: ERROR:',e);
return 0
}
}
const consumeWithRetryDefaultOptions = {waitIntervalSec:60*5, maxRetries:5};
/**
* Assert the topology necessary for the retry and registers the callback as consumer of the queue
* @param queueName - Name of the queue to be consumed
* @param callback - Consumer callback called with the parsed json data. Can return a Promise to be awaited
* @param options - Options for retry strategy
* waitIntervalSec - Wait time for messages to be retried. A new queue is created everytime a this receives a new value
* maxRetries - Maximum number of retries before sending the message to the error queue
* @returns {Promise<void>}
*/
const consumeWithRetry = async (queueName, callback, options=consumeWithRetryDefaultOptions) =>{
const {waitIntervalSec, maxRetries} = {...consumeWithRetryDefaultOptions, ...options};
const setupFn = setupForQueue(queueName,waitIntervalSec);
console.log(`rabbitManager:: adding queue ${queueName} setup.`)
const consumerChannel = connection.createChannel({
json: true,
});
await consumerChannel.addSetup(setupFn);
console.log(`rabbitManager:: registering queue ${queueName} consumer.`)
await consumerChannel.consume(queueName, async (data)=>{
const deathCount = getDeathCount(data);
console.log(`rabbitManager::Message received: ${data.content} - [deaths:${deathCount}/${maxRetries}]`)
if(deathCount>=maxRetries){
console.log(`rabbitManager:: maximum retries reached (deaths: ${deathCount}) for message: ${data.content.toString()}, publishing to error queue...`);
//* Publish to error queue if max retries reached
await consumerChannel.sendToQueue(getQueueErrorQueueName(queueName), data.content);
//* ack message to remove from original queue
consumerChannel.ack(data);
return;
}
try{
const message = JSON.parse(data.content.toString());
await Promise.resolve(callback(message));
consumerChannel.ack(data);
}catch (e){
//*On error, nack the message with requeue false to make it go to retry-dlx directly
console.log(`rabbitManager:: nacking message: ${data.content.toString()}`);
consumerChannel.nack(data,false,false);
}
})
}
const sendMessage = async (queueName, message)=>{
return channel.sendToQueue(queueName, message)
}
return {consumeWithRetry, sendMessage}
}
module.exports = rabbitManager;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment