Last active
March 12, 2018 10:52
-
-
Save bondvt04/5c1ac69c76929dbaa6f47dedccf2df1d to your computer and use it in GitHub Desktop.
RabbitMQ retries - send failed messages to retry loop
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// emulate some job which can be failed | |
function handler() { | |
return Math.random() > 0.1 ? Promise.resolve() : Promise.reject() | |
} | |
channel.consume('comments', msg => { | |
return (new Promise((resolve, reject) => { | |
if (msg.fields.redelivered) { | |
reject('Message was redelivered, so something wrong happened'); | |
return; | |
} | |
handler(msg, channel) | |
.then(resolve) | |
.catch(reject); | |
})) | |
.then(() => { | |
channel.ack(msg); | |
}) | |
.catch(handleRejectedMsg); | |
function handleRejectedMsg(reasonOfFail) { | |
return sendMsgToRetry({ msg, queue:'comments', channel, reasonOfFail }); | |
} | |
} | |
function sendMsgToRetry(args) { | |
const channel = args.channel; | |
const queue = args.queue; | |
const msg = args.msg; | |
// ack original msg | |
channel.ack(msg); | |
// Unpack content, update and pack it back | |
function getAttemptAndUpdatedContent(msg) { | |
let content = JSON.parse(msg.content.toString(contentEncoding)); | |
content.try_attempt = ++content.try_attempt || 1; | |
const attempt = content.try_attempt; | |
content = Buffer.from(JSON.stringify(content), contentEncoding); | |
return { attempt, content }; | |
} | |
const { attempt, content } = getAttemptAndUpdatedContent(msg); | |
if (attempt <= 3) { | |
const routingKey = `retry-${attempt}`; | |
const options = { | |
contentEncoding, | |
contentType: contentTypeJson, | |
persistent: true, | |
}; | |
Object.keys(msg.properties).forEach(key => { | |
options[key] = msg.properties[key]; | |
}); | |
return channel.publish('TTL-comments', routingKey, content, options); | |
} | |
return Promise.resolve(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment