Last active
September 20, 2022 02:01
-
-
Save bondvt04/029342a51c5fc03da3e809dbc2be5edc to your computer and use it in GitHub Desktop.
RabbitMQ retries
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
const AmqpClient = require('amqplib'); | |
const Promise = require('bluebird'); | |
const contentTypeJson = 'application/json'; | |
const contentEncoding = 'utf8'; | |
const config = { | |
exchanges: [ | |
{ name: 'A_COMMENT_CREATED', type: 'fanout' }, | |
{ name: 'A_COMMENT_DELETED', type: 'fanout' }, | |
{ name: 'A_COMMENT_UPDATED', type: 'fanout' }, | |
], | |
bindings: [ | |
{ exchange: 'A_COMMENT_CREATED', target: 'comments' }, | |
{ exchange: 'A_COMMENT_DELETED', target: 'comments' }, | |
{ exchange: 'A_COMMENT_UPDATED', target: 'comments' }, | |
], | |
queues: [ | |
{ name: 'comments' }, | |
], | |
retry_ttl_queues: [ | |
{ name: '<queue_name>-retry-1-30s', delay: 30000 }, | |
{ name: '<queue_name>-retry-2-10m', delay: 600000 }, | |
{ name: '<queue_name>-retry-3-48h', delay: 195840000 }, | |
], | |
}; | |
let connectionPromise; | |
const amqpService = initAmqp(); | |
amqpService.pConnect(); | |
amqpService | |
.pAssert() | |
.then(() => { | |
amqpService | |
.pConsume('comments', handleMsg) | |
.catch(console.error); | |
// Do something important with our messages | |
function handleMsg (msg, channel) { | |
return Promise.resolve(); | |
// return Promise.reject(); | |
// throw new Error('Something wrong with handler'); | |
} | |
}); | |
function initAmqp() { | |
const port_str = config.port ? `:${config.port}` : ''; | |
const vhost_str = config.vhost ? `/${encodeURIComponent(config.vhost)}` : ''; | |
const url = `amqp://${config.host}${port_str}${vhost_str}`; | |
const amqp = { | |
// Init all the queues and exchanges if not exists | |
pAssert() { | |
return connectionPromise | |
.catch(console.error) | |
.then(({ channel }) => { | |
// Make it in sync order (for clarity): | |
// 1) assert exchanges | |
// 2) assert queues | |
// 3) bind exchanges to queues | |
return assertExchanges() | |
.then(assertQueues) | |
.then(bindExchangesToQueues); | |
function assertExchanges() { | |
return Promise.all([] | |
// "real" payload exchanges | |
.concat( | |
config.exchanges.map(exchange => channel.assertExchange( | |
exchange.name, exchange.type, { | |
durable: true, | |
})) | |
) | |
// DLX (one per "real" queue) | |
.concat( | |
config.queues.map(({ name: queue }) => { | |
const dlxName = amqp._getDLXName({ queue }); | |
return channel.assertExchange( | |
dlxName, | |
'fanout', | |
{ durable: true } | |
); | |
}) | |
) | |
// TTLX (one per "real" payload queue) - failed msgs goes to | |
// this exchange first and than redirected to corresponding ttlq using | |
// corresponding routing keys | |
.concat( | |
config.queues.map(({ name: queue }) => { | |
const ttlxName = amqp._getTTLXName({ queue }); | |
return channel.assertExchange( | |
ttlxName, | |
'direct', | |
{ durable: true } | |
); | |
}) | |
) | |
); | |
} | |
function assertQueues() { | |
return Promise.all([] | |
.concat(config.queues.map(({ name: queue }) => { | |
const dlxName = amqp._getDLXName({ queue }); | |
return Promise.all([] | |
// "real" payload queue | |
.concat( | |
channel.assertQueue(queue, { durable: true }), | |
) | |
// a few ttl queues per one "real" queue | |
.concat( | |
config.retry_ttl_queues.map((ttl_queue, index) => { | |
const attempt = index + 1; | |
const ttlQueueName = amqp._getTTLQName({ queue, attempt }); | |
return channel.assertQueue( | |
ttlQueueName, | |
{ | |
durable: true, | |
deadLetterExchange: dlxName, // x-dead-letter-exchange | |
messageTtl: ttl_queue.delay, // x-message-ttl | |
// we can use this key for decreasing queues amount: | |
// deadLetterRoutingKey: dlxName | |
} | |
); | |
}) | |
) | |
); | |
})) | |
); | |
} | |
function bindExchangesToQueues() { | |
return Promise.all([] | |
// bind "real" payload exchanges to "real" payload queues | |
.concat( | |
config.bindings.map(bind => channel.bindQueue(bind.target, bind.exchange)) | |
) | |
// bind DLX and TTLX for retries | |
.concat( | |
config.queues.map(({ name: queue }) => { | |
const dlxName = amqp._getDLXName({ queue }); | |
const ttlxName = amqp._getTTLXName({ queue }); | |
return Promise.all([] | |
// DLX to "real" payload exchange | |
.concat( | |
channel.bindQueue(queue, dlxName) | |
) | |
// TTLX to ttl queues | |
.concat( | |
config.retry_ttl_queues.map((ttl_queue, index) => { | |
const attempt = index + 1; | |
const ttlqName = amqp._getTTLQName({ queue, attempt }); | |
const routingKey = amqp._getTTLRoutingKey({ attempt }); | |
return channel.bindQueue(ttlqName, ttlxName, routingKey); | |
}) | |
) | |
); | |
}) | |
) | |
); | |
} | |
}) | |
.catch(console.error); | |
}, | |
pConsume(queue, handler, options = {}) { | |
return connectionPromise | |
.catch(amqp._connectionErrorHandler) | |
.then(({ channel }) => channel.consume(queue, msg => { | |
return (new Promise((resolve, reject) => { | |
if (msg.fields.redelivered) { | |
reject('Message was redelivered, so something wrong happened'); | |
return; | |
} | |
handler(msg, channel) | |
.then(resolve) | |
.catch(reject); | |
})) | |
.then(() => { | |
channel.ack(msg); | |
}) | |
// catch here allows us handle all the varieties of fails: | |
// - exceptions in handlers | |
// - rejects in handlers | |
// - redeliveries (server was down or something else) | |
.catch(handleRejectedMsg); | |
function handleRejectedMsg(reasonOfFail) { | |
return amqp._sendMsgToRetry({ msg, queue, channel, reasonOfFail }); | |
} | |
}, options) | |
); | |
}, | |
pConnect() { | |
connectionPromise = AmqpClient | |
.connect(url) | |
.catch(console.error) | |
.then(cnx => | |
cnx | |
.createChannel() | |
.catch(console.error) | |
.then(channel => { | |
return { channel, connection: cnx }; | |
}) | |
) | |
.catch(console.error); | |
return connectionPromise; | |
}, | |
// Ack original msg, create new one with TTL and send | |
// to corresponding ttl queue where msg will be expired, | |
// die and through DLX goes to next retry | |
_sendMsgToRetry(args) { | |
const channel = args.channel; | |
const queue = args.queue; | |
const msg = args.msg; | |
const attempts_total = config.retry_ttl_queues.length; | |
// ack original msg | |
channel.ack(msg); | |
// Unpack content, update and pack it back | |
function getAttemptAndUpdatedContent(msg) { | |
let content = JSON.parse(msg.content.toString(contentEncoding)); | |
// "exchange" field should exist, but who knows. in the other case we would have endless loop | |
// cos native msg.fields.exchange will be changed after walking through DLX | |
content.exchange = content.exchange || msg.fields.exchange; | |
content.try_attempt = ++content.try_attempt || 1; | |
// we don't rely on x-death, so write counter for sure | |
const attempt = content.try_attempt; | |
content = Buffer.from(JSON.stringify(content), contentEncoding); | |
return { attempt, content }; | |
} | |
const { attempt, content } = getAttemptAndUpdatedContent(msg); | |
if (attempt <= attempts_total) { | |
const ttlxName = amqp._getTTLXName({ queue }); | |
const routingKey = amqp._getTTLRoutingKey({ attempt }); | |
const options = { | |
contentEncoding, | |
contentType: contentTypeJson, | |
persistent: true, | |
}; | |
// trying to reproduce original message | |
// including msg.properties.messageId and such | |
// but excluding msg.fields.redelivered | |
Object.keys(msg.properties).forEach(key => { | |
options[key] = msg.properties[key]; | |
}); | |
return channel.publish(ttlxName, routingKey, content, options); | |
} | |
return Promise.resolve(); | |
}, | |
_getTTLQName(options) { | |
const queue = options.queue; | |
const attempt = options.attempt || 1; | |
return config.retry_ttl_queues[attempt - 1].name.replace('<queue_name>', queue); | |
}, | |
_getTTLRoutingKey(options) { | |
const attempt = options.attempt || 1; | |
return `retry-${attempt}`; | |
}, | |
_getDLXName(options) { | |
const queue = options.queue; | |
return `DLX-${queue}`.replace(/-/g, '_').toUpperCase(); | |
}, | |
_getTTLXName(options) { | |
const queue = options.queue; | |
return `TTL-${queue}`.replace(/-/g, '_').toUpperCase(); | |
}, | |
}; | |
return amqp; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment