Skip to content

Instantly share code, notes, and snippets.

@carlhoerberg
Created May 13, 2015 14:45
Show Gist options
  • Save carlhoerberg/006b01ac17a0a94859ba to your computer and use it in GitHub Desktop.
Save carlhoerberg/006b01ac17a0a94859ba to your computer and use it in GitHub Desktop.
How to build reconnect logic for amqplib
var amqp = require('amqplib/callback_api');
// if the connection is closed or fails to be established at all, we will reconnect
var amqpConn = null;
function start() {
amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) {
if (err) {
console.error("[AMQP]", err.message);
return setTimeout(start, 1000);
}
conn.on("error", function(err) {
if (err.message !== "Connection closing") {
console.error("[AMQP] conn error", err.message);
}
});
conn.on("close", function() {
console.error("[AMQP] reconnecting");
return setTimeout(start, 1000);
});
console.log("[AMQP] connected");
amqpConn = conn;
whenConnected();
});
}
function whenConnected() {
startPublisher();
startWorker();
}
var pubChannel = null;
var offlinePubQueue = [];
function startPublisher() {
amqpConn.createConfirmChannel(function(err, ch) {
if (closeOnErr(err)) return;
ch.on("error", function(err) {
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
pubChannel = ch;
while (true) {
var m = offlinePubQueue.shift();
if (!m) break;
publish(m[0], m[1], m[2]);
}
});
}
// method to publish a message, will queue messages internally if the connection is down and resend later
function publish(exchange, routingKey, content) {
try {
pubChannel.publish(exchange, routingKey, content, { persistent: true },
function(err, ok) {
if (err) {
console.error("[AMQP] publish", err);
offlinePubQueue.push([exchange, routingKey, content]);
pubChannel.connection.close();
}
});
} catch (e) {
console.error("[AMQP] publish", e.message);
offlinePubQueue.push([exchange, routingKey, content]);
}
}
// A worker that acks messages only if processed succesfully
function startWorker() {
amqpConn.createChannel(function(err, ch) {
if (closeOnErr(err)) return;
ch.on("error", function(err) {
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
ch.prefetch(10);
ch.assertQueue("jobs", { durable: true }, function(err, _ok) {
if (closeOnErr(err)) return;
ch.consume("jobs", processMsg, { noAck: false });
console.log("Worker is started");
});
function processMsg(msg) {
work(msg, function(ok) {
try {
if (ok)
ch.ack(msg);
else
ch.reject(msg, true);
} catch (e) {
closeOnErr(e);
}
});
}
});
}
function work(msg, cb) {
console.log("Got msg", msg.content.toString());
cb(true);
}
function closeOnErr(err) {
if (!err) return false;
console.error("[AMQP] error", err);
amqpConn.close();
return true;
}
setInterval(function() {
publish("", "jobs", new Buffer("work work work"));
}, 1000);
start();
@varunnayal
Copy link

I tried running above snippet by dividing them in two separate files in worker.js and producer.js.
Notable changes I've done in worker.js is to change cb(true) to setTimeout(function() { cb(true) }, 12000), to mimic a some time consuming processing.
Also producer.js changed to publish just one message at a time.
Now problem comes when rabbitmq server restarts after worker receives the message and before worker sends the ack for the message it reconnects to the server. After reconnecting worker receives the same message from server while current message is being prepared to send ack.

Here is the snippet of test files:
worker.js

/**
* worker.js
*/
var amqp = require('amqplib/callback_api');
process.env.CLOUDAMQP_URL = 'amqp://localhost';

// if the connection is closed or fails to be established at all, we will reconnect
var amqpConn = null;
function start() {
  amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) {
    if (err) {
      console.error("[AMQP]", err.message);
      return setTimeout(start, 7000);
    }
    conn.on("error", function(err) {
      if (err.message !== "Connection closing") {
        console.error("[AMQP] conn error", err.message);
      }
    });
    conn.on("close", function() {
      console.error("[AMQP] reconnecting");
      return setTimeout(start, 7000);
    });

    console.log("[AMQP] connected");
    amqpConn = conn;

    whenConnected();
  });
}

function whenConnected() {
  startWorker();
}

// A worker that acks messages only if processed succesfully
function startWorker() {
  amqpConn.createChannel(function(err, ch) {
    if (closeOnErr(err)) return;
    ch.on("error", function(err) {
      console.error("[AMQP] channel error", err.message);
    });
    ch.on("close", function() {
      console.log("[AMQP] channel closed");
    });
    ch.prefetch(10);
    ch.assertQueue("jobs", { durable: true }, function(err, _ok) {
      if (closeOnErr(err)) return;
      ch.consume("jobs", processMsg, { noAck: false });
      console.log("Worker is started");
    });

    function processMsg(msg) {
      var incomingDate = (new Date()).toISOString();
      console.log("Msg [deliveryTag=" + msg.fields.deliveryTag + "] arrived at " + incomingDate);
      work(msg, function(ok) {
        console.log("Sending Ack for msg at time " + incomingDate);
        try {
          if (ok)
            ch.ack(msg);
          else
            ch.reject(msg, true);
        } catch (e) {
          closeOnErr(e);
        }
      });
    }
  });
}

function work(msg, cb) {
  console.log("Got msg", msg.content.toString());
  setTimeout(() => cb(true), process.env.WORK_WAIT_TIME || 12000);
}

function closeOnErr(err) {
  if (!err) return false;
  console.error("[AMQP] error", err);
  amqpConn.close();
  return true;
}

start();

producer.js

/**
* producer.js
**/
var amqp = require('amqplib/callback_api');
process.env.CLOUDAMQP_URL = 'amqp://localhost';

// if the connection is closed or fails to be established at all, we will reconnect
var amqpConn = null;
function start() {
  amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) {
    if (err) {
      console.error("[AMQP]", err.message);
      return setTimeout(start, 7000);
    }
    conn.on("error", function(err) {
      if (err.message !== "Connection closing") {
        console.error("[AMQP] conn error", err.message);
      }
    });
    conn.on("close", function() {
      console.error("[AMQP] reconnecting");
      return setTimeout(start, 7000);
    });

    console.log("[AMQP] connected");
    amqpConn = conn;

    whenConnected();
  });
}

function whenConnected() {
  startPublisher();
}

var pubChannel = null;
var offlinePubQueue = [];
function startPublisher() {
  amqpConn.createConfirmChannel(function(err, ch) {
    if (closeOnErr(err)) return;
    ch.on("error", function(err) {
      console.error("[AMQP] channel error", err.message);
    });
    ch.on("close", function() {
      console.log("[AMQP] channel closed");
    });

    pubChannel = ch;
    while (true) {
      var m = offlinePubQueue.shift();
			console.log('M = ', m);
      if (!m) break;
      publish(m[0], m[1], m[2]);
    }
  });
}

// method to publish a message, will queue messages internally if the connection is down and resend later
function publish(exchange, routingKey, content) {
  try {
    pubChannel.publish(exchange, routingKey, content, { persistent: true },
                       function(err, ok) {
                         if (err) {
                           console.error("[AMQP] publish", err);
                           offlinePubQueue.push([exchange, routingKey, content]);
                           pubChannel.connection.close();
                         }
                       });
  } catch (e) {
    console.error("[AMQP] publish", e.message);
    offlinePubQueue.push([exchange, routingKey, content]);
  }
}

function closeOnErr(err) {
  if (!err) return false;
  console.error("[AMQP] error", err);
  amqpConn.close();
  return true;
}

setTimeout(function() {
  publish("", "jobs", new Buffer("work work work"));
}, 3000);

start();

Here is the log received from worker.js:

[AMQP] connected
Worker is started
### Message Received ###
Msg [deliveryTag=1] arrived at 2017-07-13T16:17:37.331Z
Got msg work work work
### RABBITMQ server restarted before we could send ack ###
[AMQP] channel closed
[AMQP] reconnecting

### Server Connected ###
[AMQP] connected
Worker is started

### Same message received again ###
Msg [deliveryTag=1] arrived at 2017-07-13T16:17:46.285Z
Got msg work work work

### Error while sending ack now (For message received before reconnecting ###
Sending Ack for msg at time 2017-07-13T16:17:37.331Z
[AMQP] error { IllegalOperationError: Channel closed
    at Channel.<anonymous> (/test/node_modules/amqplib/lib/channel.js:149:11)
    at Channel.ack (/test/node_modules/amqplib/lib/callback_model.js:234:8)
    at /test/worker.js:57:16
    at Timeout.setTimeout (/test/worker.js:70:20)
    at ontimeout (timers.js:380:14)
    at tryOnTimeout (timers.js:244:5)
    at Timer.listOnTimeout (timers.js:214:5)
  message: 'Channel closed',
  stack: 'IllegalOperationError: Channel closed\n    at Channel.<anonymous> (/test/node_modules/amqplib/lib/channel.js:149:11)\n    at Channel.ack (/test/node_modules/amqplib/lib/callback_model.js:234:8)\n    at /test/worker.js:57:16\n    at Timeout.setTimeout (/test/worker.js:70:20)\n    at ontimeout (timers.js:380:14)\n    at tryOnTimeout (timers.js:244:5)\n    at Timer.listOnTimeout (timers.js:214:5)',
  stackAtStateChange: 'Stack capture: Connection closed: 320 (CONNECTION-FORCED) with message "CONNECTION_FORCED - broker forced connection closure with reason \'shutdown\'"\n    at Object.accept (/test/node_modules/amqplib/lib/connection.js:89:15)\n    at Connection.mainAccept [as accept] (/test/node_modules/amqplib/lib/connection.js:63:33)\n    at Socket.go (/test/node_modules/amqplib/lib/connection.js:476:48)\n    at emitNone (events.js:86:13)\n    at Socket.emit (events.js:185:7)\n    at emitReadable_ (_stream_readable.js:432:10)\n    at emitReadable (_stream_readable.js:426:7)\n    at readableAddChunk (_stream_readable.js:187:13)\n    at Socket.Readable.push (_stream_readable.js:134:10)\n    at TCP.onread (net.js:551:20)' }
[AMQP] channel closed
[AMQP] reconnecting

Any ideas on how can we handle this scenario?

@srkimir
Copy link

srkimir commented Oct 24, 2017

What if error or close are emitted on channel(not on connection) object from worker perspective? No need to reconnect?

@kiwenlau
Copy link

kiwenlau commented Apr 13, 2018

Following is my code with Async/Await:

const amqp = require("amqplib");

var connection;

async function connectRabbitMQ()
{
    try
    {
        connection = await amqp.connect("amqp://localhost");
        console.log("connect to RabbitMQ success!");

        // How to reproduce "error" event?
        connection.on("error", function(err)
        {
            logger.error(err);
            setTimeout(connectRabbitMQ, 10000);
        });

        connection.on("close", function()
        {
            logger.error("connection to RabbitQM closed!");
            setTimeout(connectRabbitMQ, 10000);
        });

    }
    catch (err)
    {
        console.error(err);
        setTimeout(connectRabbitMQ, 10000);
    }


}

connectRabbitMQ();

By stopping RabbitMQ container, I can reproduce "close" event, but how can I reproduce "error" event on connection?

@OsoianMarcel
Copy link

@pitops
Copy link

pitops commented Jan 17, 2019

@varunnayal I have same issue, any workarounds?

@JuergenSimon
Copy link

JuergenSimon commented Apr 1, 2019

@kiwenlau: await connection.connection._events.error('foobar');

@johanrhodin
Copy link

We should update the user of Buffer():
(node:25702) [DEP0005] DeprecationWarning: Buffer() is deprecated due to security and usability issues. Please use the Buffer.alloc(), Buffer.allocUnsafe(), or Buffer.from() methods instead.

@teamdraftbox
Copy link

@johanrhodin please use this code instead of the depreciated one;
publish("", "jobs", new Buffer.from("work work work"));

@icinemagr
Copy link

Thank you for your code logic. In fact, when I try to restart RabbitMQ, because it took more than 1s to connect to the MQ, 2 or more connections were created. So I just want to add a bit of code to make it perfect.

var isConnecting = false;
function start() {
    if (isConnecting) return;
    isConnecting = true;**
    amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) {
        isConnecting = false;**
...
}

Exactly what i did !!!!!! I thought none see it.

@deleteeeeeeeeeeeed
Copy link

how can i set rate limit
i mean consumer should read just 4 message per minute

@jonahmolinski
Copy link

@Bahramibh

You can use channel.prefetch to limit the number of messages that the consumer has but has not acknowledged. I.e, only process upto 4 messages at a time: channel.prefetch(4)

@MaksAnn
Copy link

MaksAnn commented Nov 5, 2022

Following is my code with Async/Await:

const amqp = require("amqplib");

var connection;

async function connectRabbitMQ()
{
    try
    {
        connection = await amqp.connect("amqp://localhost");
        console.log("connect to RabbitMQ success!");

        // How to reproduce "error" event?
        connection.on("error", function(err)
        {
            logger.error(err);
            setTimeout(connectRabbitMQ, 10000);
        });

        connection.on("close", function()
        {
            logger.error("connection to RabbitQM closed!");
            setTimeout(connectRabbitMQ, 10000);
        });

    }
    catch (err)
    {
        console.error(err);
        setTimeout(connectRabbitMQ, 10000);
    }


}

connectRabbitMQ();

By stopping RabbitMQ container, I can reproduce "close" event, but how can I reproduce "error" event on connection?

One way is just turn on VPN - it's should broke connection

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment