Skip to content

Instantly share code, notes, and snippets.

@materkel
Last active February 14, 2023 00:11
Show Gist options
  • Save materkel/f46fdf266f35d8c525aea16719f837ac to your computer and use it in GitHub Desktop.
Save materkel/f46fdf266f35d8c525aea16719f837ac to your computer and use it in GitHub Desktop.
Scheduling messages with RabbitMQ, using the rabbitmq_delayed_message_exchange plugin and amqplib in NodeJS
/**
* Install and enable the rabbitmq_delayed_message_exchange plugin as described by Alvaro Videla in this blogpost:
* https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/
*/
const amqp = require('amqplib');
const exchange = 'yourExchangeName';
const queue = 'yourQueueName';
const queueBinding = 'yourQueueBindingName';
// Message consumer
amqp
.connect('amqp://localhost')
.then(conn => conn.createChannel())
.then(ch => {
// Assert a x-delayed-message Exchange. The type of the exchange is specified in the arguments as "x-delayed-type"
ch.assertExchange(exchange, 'x-delayed-message', { durable: true, arguments: { 'x-delayed-type': 'direct' } });
return ch
.assertQueue(queue, { durable: true })
.then(() => ch.bindQueue(queue, exchange, queueBinding))
.then(() => {
ch.consume(queue, (msg) => {
// Handle delayed message
// ...
}, { noAck: true });
});
});
// Publish message
amqp
.connect('amqp://localhost')
.then(conn => conn.createChannel())
.then(ch => {
// Publish message with a delay of 500 ms
const headers = { 'x-delay': 500 };
ch.publish(exchange, queueBinding, new Buffer('hello world'), { headers });
});
@zbobyuan
Copy link

awesome.

@stefanwalther
Copy link

Is there a way with RabbitMQ not only to delay message, but schedule them, so e.g. once a day?

@vermaslal
Copy link

Nice example.

@drymek
Copy link

drymek commented Jan 26, 2017

Thanks for that!

@tuanh118
Copy link

Thank you for the example. However, I got the following error trying to run your code:

Error: Connection closed: 503 (COMMAND-INVALID) with message "COMMAND_INVALID - unknown exchange type 'x-delayed-message'"
    at Object.accept (/opt/app/node_modules/amqplib/lib/connection.js:90:15)
    at Connection.mainAccept [as accept] (/opt/app/node_modules/amqplib/lib/connection.js:63:33)
    at Socket.go (/opt/app/node_modules/amqplib/lib/connection.js:476:48)
    at emitNone (events.js:86:13)
    at Socket.emit (events.js:185:7)
    at emitReadable_ (_stream_readable.js:432:10)
    at emitReadable (_stream_readable.js:426:7)
    at readableAddChunk (_stream_readable.js:187:13)
    at Socket.Readable.push (_stream_readable.js:134:10)
    at TCP.onread (net.js:551:20)

@serhatates
Copy link

serhatates commented Mar 8, 2017

Callback API version of code

const exchange = 'yourExchangeName';
const queue = 'yourQueueName';
const queueBinding = 'yourQueueBindingName';

require('amqplib/callback_api')
    .connect('amqp://localhost', function (err, conn) {
        if (err != null) bail(err);
        consumer(conn);
        publisher(conn);
    });

function bail(err) {
    console.error(err);
    process.exit(1);
}

// Message consumer
function consumer(conn) {
    var ok = conn.createChannel(on_open);
    function on_open(err, ch) {
        if (err != null) bail(err);
        ch.assertExchange(exchange, 'x-delayed-message', { durable: true, arguments: { 'x-delayed-type': 'direct' } });
        ch.assertQueue(queue, { durable: true });
        ch.bindQueue(queue, exchange, queueBinding);
        ch.consume(queue, function (msg) {
            if (msg !== null) {
                console.log(msg.content.toString());
                ch.ack(msg);
            }
        });
    }
}

// Publish message
function publisher(conn) {
    conn.createChannel(on_open);
    function on_open(err, ch) {
        if (err != null) bail(err);
        const headers = { 'x-delay': 10000 };
        ch.publish(exchange, queueBinding, new Buffer('hello 10sn from past'), { headers });
    }
}

@Raidus
Copy link

Raidus commented Sep 14, 2018

Is there a way with RabbitMQ not only to delay message, but schedule them, so e.g. once a day?

Could be easily done with cron jobs. Either with crontab or modules like https://github.com/kelektiv/node-cron

@dagenius007
Copy link

Thanks for this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment