Skip to content

Instantly share code, notes, and snippets.

@materkel
Last active February 14, 2023 00:11
Show Gist options
  • Save materkel/f46fdf266f35d8c525aea16719f837ac to your computer and use it in GitHub Desktop.
Save materkel/f46fdf266f35d8c525aea16719f837ac to your computer and use it in GitHub Desktop.
Scheduling messages with RabbitMQ, using the rabbitmq_delayed_message_exchange plugin and amqplib in NodeJS
/**
* Install and enable the rabbitmq_delayed_message_exchange plugin as described by Alvaro Videla in this blogpost:
* https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/
*/
const amqp = require('amqplib');
const exchange = 'yourExchangeName';
const queue = 'yourQueueName';
const queueBinding = 'yourQueueBindingName';
// Message consumer
amqp
.connect('amqp://localhost')
.then(conn => conn.createChannel())
.then(ch => {
// Assert a x-delayed-message Exchange. The type of the exchange is specified in the arguments as "x-delayed-type"
ch.assertExchange(exchange, 'x-delayed-message', { durable: true, arguments: { 'x-delayed-type': 'direct' } });
return ch
.assertQueue(queue, { durable: true })
.then(() => ch.bindQueue(queue, exchange, queueBinding))
.then(() => {
ch.consume(queue, (msg) => {
// Handle delayed message
// ...
}, { noAck: true });
});
});
// Publish message
amqp
.connect('amqp://localhost')
.then(conn => conn.createChannel())
.then(ch => {
// Publish message with a delay of 500 ms
const headers = { 'x-delay': 500 };
ch.publish(exchange, queueBinding, new Buffer('hello world'), { headers });
});
@dagenius007
Copy link

Thanks for this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment