Skip to content

Instantly share code, notes, and snippets.

Last active September 20, 2022 02:01
Show Gist options
  • Save bondvt04/029342a51c5fc03da3e809dbc2be5edc to your computer and use it in GitHub Desktop.
Save bondvt04/029342a51c5fc03da3e809dbc2be5edc to your computer and use it in GitHub Desktop.
RabbitMQ retries
'use strict';
const AmqpClient = require('amqplib');
const Promise = require('bluebird');
const contentTypeJson = 'application/json';
const contentEncoding = 'utf8';
const config = {
exchanges: [
{ name: 'A_COMMENT_CREATED', type: 'fanout' },
{ name: 'A_COMMENT_DELETED', type: 'fanout' },
{ name: 'A_COMMENT_UPDATED', type: 'fanout' },
bindings: [
{ exchange: 'A_COMMENT_CREATED', target: 'comments' },
{ exchange: 'A_COMMENT_DELETED', target: 'comments' },
{ exchange: 'A_COMMENT_UPDATED', target: 'comments' },
queues: [
{ name: 'comments' },
retry_ttl_queues: [
{ name: '<queue_name>-retry-1-30s', delay: 30000 },
{ name: '<queue_name>-retry-2-10m', delay: 600000 },
{ name: '<queue_name>-retry-3-48h', delay: 195840000 },
let connectionPromise;
const amqpService = initAmqp();
.then(() => {
.pConsume('comments', handleMsg)
// Do something important with our messages
function handleMsg (msg, channel) {
return Promise.resolve();
// return Promise.reject();
// throw new Error('Something wrong with handler');
function initAmqp() {
const port_str = config.port ? `:${config.port}` : '';
const vhost_str = config.vhost ? `/${encodeURIComponent(config.vhost)}` : '';
const url = `amqp://${}${port_str}${vhost_str}`;
const amqp = {
// Init all the queues and exchanges if not exists
pAssert() {
return connectionPromise
.then(({ channel }) => {
// Make it in sync order (for clarity):
// 1) assert exchanges
// 2) assert queues
// 3) bind exchanges to queues
return assertExchanges()
function assertExchanges() {
return Promise.all([]
// "real" payload exchanges
.concat( => channel.assertExchange(, exchange.type, {
durable: true,
// DLX (one per "real" queue)
.concat({ name: queue }) => {
const dlxName = amqp._getDLXName({ queue });
return channel.assertExchange(
{ durable: true }
// TTLX (one per "real" payload queue) - failed msgs goes to
// this exchange first and than redirected to corresponding ttlq using
// corresponding routing keys
.concat({ name: queue }) => {
const ttlxName = amqp._getTTLXName({ queue });
return channel.assertExchange(
{ durable: true }
function assertQueues() {
return Promise.all([]
.concat({ name: queue }) => {
const dlxName = amqp._getDLXName({ queue });
return Promise.all([]
// "real" payload queue
channel.assertQueue(queue, { durable: true }),
// a few ttl queues per one "real" queue
.concat(, index) => {
const attempt = index + 1;
const ttlQueueName = amqp._getTTLQName({ queue, attempt });
return channel.assertQueue(
durable: true,
deadLetterExchange: dlxName, // x-dead-letter-exchange
messageTtl: ttl_queue.delay, // x-message-ttl
// we can use this key for decreasing queues amount:
// deadLetterRoutingKey: dlxName
function bindExchangesToQueues() {
return Promise.all([]
// bind "real" payload exchanges to "real" payload queues
.concat( => channel.bindQueue(,
// bind DLX and TTLX for retries
.concat({ name: queue }) => {
const dlxName = amqp._getDLXName({ queue });
const ttlxName = amqp._getTTLXName({ queue });
return Promise.all([]
// DLX to "real" payload exchange
channel.bindQueue(queue, dlxName)
// TTLX to ttl queues
.concat(, index) => {
const attempt = index + 1;
const ttlqName = amqp._getTTLQName({ queue, attempt });
const routingKey = amqp._getTTLRoutingKey({ attempt });
return channel.bindQueue(ttlqName, ttlxName, routingKey);
pConsume(queue, handler, options = {}) {
return connectionPromise
.then(({ channel }) => channel.consume(queue, msg => {
return (new Promise((resolve, reject) => {
if (msg.fields.redelivered) {
reject('Message was redelivered, so something wrong happened');
handler(msg, channel)
.then(() => {
// catch here allows us handle all the varieties of fails:
// - exceptions in handlers
// - rejects in handlers
// - redeliveries (server was down or something else)
function handleRejectedMsg(reasonOfFail) {
return amqp._sendMsgToRetry({ msg, queue, channel, reasonOfFail });
}, options)
pConnect() {
connectionPromise = AmqpClient
.then(cnx =>
.then(channel => {
return { channel, connection: cnx };
return connectionPromise;
// Ack original msg, create new one with TTL and send
// to corresponding ttl queue where msg will be expired,
// die and through DLX goes to next retry
_sendMsgToRetry(args) {
const channel =;
const queue = args.queue;
const msg = args.msg;
const attempts_total = config.retry_ttl_queues.length;
// ack original msg
// Unpack content, update and pack it back
function getAttemptAndUpdatedContent(msg) {
let content = JSON.parse(msg.content.toString(contentEncoding));
// "exchange" field should exist, but who knows. in the other case we would have endless loop
// cos native will be changed after walking through DLX = ||;
content.try_attempt = ++content.try_attempt || 1;
// we don't rely on x-death, so write counter for sure
const attempt = content.try_attempt;
content = Buffer.from(JSON.stringify(content), contentEncoding);
return { attempt, content };
const { attempt, content } = getAttemptAndUpdatedContent(msg);
if (attempt <= attempts_total) {
const ttlxName = amqp._getTTLXName({ queue });
const routingKey = amqp._getTTLRoutingKey({ attempt });
const options = {
contentType: contentTypeJson,
persistent: true,
// trying to reproduce original message
// including and such
// but excluding msg.fields.redelivered
Object.keys( => {
options[key] =[key];
return channel.publish(ttlxName, routingKey, content, options);
return Promise.resolve();
_getTTLQName(options) {
const queue = options.queue;
const attempt = options.attempt || 1;
return config.retry_ttl_queues[attempt - 1].name.replace('<queue_name>', queue);
_getTTLRoutingKey(options) {
const attempt = options.attempt || 1;
return `retry-${attempt}`;
_getDLXName(options) {
const queue = options.queue;
return `DLX-${queue}`.replace(/-/g, '_').toUpperCase();
_getTTLXName(options) {
const queue = options.queue;
return `TTL-${queue}`.replace(/-/g, '_').toUpperCase();
return amqp;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment