Skip to content

Instantly share code, notes, and snippets.

@srkimir
Last active May 26, 2016 00:13
Show Gist options
  • Save srkimir/7d20576fa5b47dcd7b21f1950ace8cf9 to your computer and use it in GitHub Desktop.
Save srkimir/7d20576fa5b47dcd7b21f1950ace8cf9 to your computer and use it in GitHub Desktop.
nack/reject message to 'x-delayed-message' exchange type
'use strict'
const amqp = require('amqplib')
const STANDARD_EXCHANGE = {
name: 'standard.exchange',
type: 'topic',
options: {
durable: false
}
}
const DLX = {
name: 'dlx.exchange',
type: 'x-delayed-message',
options: {
durable: false,
arguments: {
'x-delayed-type': 'topic'
}
}
}
const STANDARD_QUEUE = {
name: 'standard.queue',
options: {
exclusive: true,
arguments: {
'x-dead-letter-exchange': DLX.name
}
},
bindingKey: '#'
}
const consumeIt = (message, channel) => {
console.log(`Message consumed at ${new Date().toJSON()}, with x-delay = ${message.properties.headers['x-delay']}`)
if (recoverableError) {
channel.reject(message, false)
console.log('REJECTED')
} else {
/**
* ACK both, successfully processed messages
* and messages that cause UNrecoverable error?
* Otherwise if message that cause UNrecoverable error
* is send to DLX (with nack/reject) fall into infinite loop?
*/
channel.ack(message, false)
console.log('ACKED')
}
}
let channel
let recoverableError = true
amqp.connect()
.then((connection) => connection.createChannel())
.then((ch) => {
channel = ch
return Promise.all([
channel.assertExchange(STANDARD_EXCHANGE.name, STANDARD_EXCHANGE.type, STANDARD_EXCHANGE.options),
channel.assertExchange(DLX.name, DLX.type, DLX.options)
])
})
.then(() => channel.assertQueue(STANDARD_QUEUE.name, STANDARD_QUEUE.options))
.then(() => {
return Promise.all([
channel.bindQueue(STANDARD_QUEUE.name, STANDARD_EXCHANGE.name, STANDARD_QUEUE.bindingKey),
channel.bindQueue(STANDARD_QUEUE.name, DLX.name, STANDARD_QUEUE.bindingKey)
])
})
.then(() => channel.consume(STANDARD_QUEUE.name, (message) => consumeIt(message, channel)))
.then(() => {
const rk = `whatever.44`
const bufferContent = new Buffer('testing delayed messages')
console.log(`Message initialy published at ${new Date().toJSON()}`)
channel.publish(STANDARD_EXCHANGE.name, rk, bufferContent, {
headers: {
'x-delay': 1000
}
})
setTimeout(() => {
recoverableError = false
}, 3 * 1000)
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment