Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
How to build reconnect logic for amqplib
var amqp = require('amqplib/callback_api');
// if the connection is closed or fails to be established at all, we will reconnect
var amqpConn = null;
function start() {
amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) {
if (err) {
console.error("[AMQP]", err.message);
return setTimeout(start, 1000);
}
conn.on("error", function(err) {
if (err.message !== "Connection closing") {
console.error("[AMQP] conn error", err.message);
}
});
conn.on("close", function() {
console.error("[AMQP] reconnecting");
return setTimeout(start, 1000);
});
console.log("[AMQP] connected");
amqpConn = conn;
whenConnected();
});
}
function whenConnected() {
startPublisher();
startWorker();
}
var pubChannel = null;
var offlinePubQueue = [];
function startPublisher() {
amqpConn.createConfirmChannel(function(err, ch) {
if (closeOnErr(err)) return;
ch.on("error", function(err) {
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
pubChannel = ch;
while (true) {
var m = offlinePubQueue.shift();
if (!m) break;
publish(m[0], m[1], m[2]);
}
});
}
// method to publish a message, will queue messages internally if the connection is down and resend later
function publish(exchange, routingKey, content) {
try {
pubChannel.publish(exchange, routingKey, content, { persistent: true },
function(err, ok) {
if (err) {
console.error("[AMQP] publish", err);
offlinePubQueue.push([exchange, routingKey, content]);
pubChannel.connection.close();
}
});
} catch (e) {
console.error("[AMQP] publish", e.message);
offlinePubQueue.push([exchange, routingKey, content]);
}
}
// A worker that acks messages only if processed succesfully
function startWorker() {
amqpConn.createChannel(function(err, ch) {
if (closeOnErr(err)) return;
ch.on("error", function(err) {
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
ch.prefetch(10);
ch.assertQueue("jobs", { durable: true }, function(err, _ok) {
if (closeOnErr(err)) return;
ch.consume("jobs", processMsg, { noAck: false });
console.log("Worker is started");
});
function processMsg(msg) {
work(msg, function(ok) {
try {
if (ok)
ch.ack(msg);
else
ch.reject(msg, true);
} catch (e) {
closeOnErr(e);
}
});
}
});
}
function work(msg, cb) {
console.log("Got msg", msg.content.toString());
cb(true);
}
function closeOnErr(err) {
if (!err) return false;
console.error("[AMQP] error", err);
amqpConn.close();
return true;
}
setInterval(function() {
publish("", "jobs", new Buffer("work work work"));
}, 1000);
start();
@haroonification

This comment has been minimized.

Show comment Hide comment
@haroonification

haroonification Sep 26, 2016

Getting the following error after installing npm install amqplib and running your file with node reconnect.js..
Any help would be appreciated. Thanks

D:\node\node_modules\amqplib\lib\connect.js:158
throw new Error("Expected amqp: or amqps: as the protocol; got " + protocol)
;
Error: Expected amqp: or amqps: as the protocol; got null
at connect (D:\node\node_modules\amqplib\lib\connect.js:158:11)
at Object.connect (D:\node\node_modules\amqplib\callback_api.js:14:3)
at start (D:\node\rabbit-email-testing\2\reconnect.js:6:8)
at Object. (D:\node\rabbit-email-testing\2\reconnect.js:119:1)
at Module._compile (module.js:409:26)
at Object.Module._extensions..js (module.js:416:10)
at Module.load (module.js:343:32)
at Function.Module._load (module.js:300:12)
at Function.Module.runMain (module.js:441:10)
at startup (node.js:139:18)

Getting the following error after installing npm install amqplib and running your file with node reconnect.js..
Any help would be appreciated. Thanks

D:\node\node_modules\amqplib\lib\connect.js:158
throw new Error("Expected amqp: or amqps: as the protocol; got " + protocol)
;
Error: Expected amqp: or amqps: as the protocol; got null
at connect (D:\node\node_modules\amqplib\lib\connect.js:158:11)
at Object.connect (D:\node\node_modules\amqplib\callback_api.js:14:3)
at start (D:\node\rabbit-email-testing\2\reconnect.js:6:8)
at Object. (D:\node\rabbit-email-testing\2\reconnect.js:119:1)
at Module._compile (module.js:409:26)
at Object.Module._extensions..js (module.js:416:10)
at Module.load (module.js:343:32)
at Function.Module._load (module.js:300:12)
at Function.Module.runMain (module.js:441:10)
at startup (node.js:139:18)

@aschmid

This comment has been minimized.

Show comment Hide comment
@aschmid

aschmid Sep 28, 2016

this because you didn't set your environment variable for CLOUDAMQP_URL
on line 6:amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) ...

aschmid commented Sep 28, 2016

this because you didn't set your environment variable for CLOUDAMQP_URL
on line 6:amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) ...

@huydq5000

This comment has been minimized.

Show comment Hide comment
@huydq5000

huydq5000 Mar 8, 2017

Thank you for your code logic. In fact, when I try to restart RabbitMQ, because it took more than 1s to connect to the MQ, 2 or more connections were created. So I just want to add a bit of code to make it perfect.

var isConnecting = false;
function start() {
    if (isConnecting) return;
    isConnecting = true;**
    amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) {
        isConnecting = false;**
...
}

huydq5000 commented Mar 8, 2017

Thank you for your code logic. In fact, when I try to restart RabbitMQ, because it took more than 1s to connect to the MQ, 2 or more connections were created. So I just want to add a bit of code to make it perfect.

var isConnecting = false;
function start() {
    if (isConnecting) return;
    isConnecting = true;**
    amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) {
        isConnecting = false;**
...
}
@varunnayal

This comment has been minimized.

Show comment Hide comment
@varunnayal

varunnayal Jul 13, 2017

I tried running above snippet by dividing them in two separate files in worker.js and producer.js.
Notable changes I've done in worker.js is to change cb(true) to setTimeout(function() { cb(true) }, 12000), to mimic a some time consuming processing.
Also producer.js changed to publish just one message at a time.
Now problem comes when rabbitmq server restarts after worker receives the message and before worker sends the ack for the message it reconnects to the server. After reconnecting worker receives the same message from server while current message is being prepared to send ack.

Here is the snippet of test files:
worker.js

/**
* worker.js
*/
var amqp = require('amqplib/callback_api');
process.env.CLOUDAMQP_URL = 'amqp://localhost';

// if the connection is closed or fails to be established at all, we will reconnect
var amqpConn = null;
function start() {
  amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) {
    if (err) {
      console.error("[AMQP]", err.message);
      return setTimeout(start, 7000);
    }
    conn.on("error", function(err) {
      if (err.message !== "Connection closing") {
        console.error("[AMQP] conn error", err.message);
      }
    });
    conn.on("close", function() {
      console.error("[AMQP] reconnecting");
      return setTimeout(start, 7000);
    });

    console.log("[AMQP] connected");
    amqpConn = conn;

    whenConnected();
  });
}

function whenConnected() {
  startWorker();
}

// A worker that acks messages only if processed succesfully
function startWorker() {
  amqpConn.createChannel(function(err, ch) {
    if (closeOnErr(err)) return;
    ch.on("error", function(err) {
      console.error("[AMQP] channel error", err.message);
    });
    ch.on("close", function() {
      console.log("[AMQP] channel closed");
    });
    ch.prefetch(10);
    ch.assertQueue("jobs", { durable: true }, function(err, _ok) {
      if (closeOnErr(err)) return;
      ch.consume("jobs", processMsg, { noAck: false });
      console.log("Worker is started");
    });

    function processMsg(msg) {
      var incomingDate = (new Date()).toISOString();
      console.log("Msg [deliveryTag=" + msg.fields.deliveryTag + "] arrived at " + incomingDate);
      work(msg, function(ok) {
        console.log("Sending Ack for msg at time " + incomingDate);
        try {
          if (ok)
            ch.ack(msg);
          else
            ch.reject(msg, true);
        } catch (e) {
          closeOnErr(e);
        }
      });
    }
  });
}

function work(msg, cb) {
  console.log("Got msg", msg.content.toString());
  setTimeout(() => cb(true), process.env.WORK_WAIT_TIME || 12000);
}

function closeOnErr(err) {
  if (!err) return false;
  console.error("[AMQP] error", err);
  amqpConn.close();
  return true;
}

start();

producer.js

/**
* producer.js
**/
var amqp = require('amqplib/callback_api');
process.env.CLOUDAMQP_URL = 'amqp://localhost';

// if the connection is closed or fails to be established at all, we will reconnect
var amqpConn = null;
function start() {
  amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) {
    if (err) {
      console.error("[AMQP]", err.message);
      return setTimeout(start, 7000);
    }
    conn.on("error", function(err) {
      if (err.message !== "Connection closing") {
        console.error("[AMQP] conn error", err.message);
      }
    });
    conn.on("close", function() {
      console.error("[AMQP] reconnecting");
      return setTimeout(start, 7000);
    });

    console.log("[AMQP] connected");
    amqpConn = conn;

    whenConnected();
  });
}

function whenConnected() {
  startPublisher();
}

var pubChannel = null;
var offlinePubQueue = [];
function startPublisher() {
  amqpConn.createConfirmChannel(function(err, ch) {
    if (closeOnErr(err)) return;
    ch.on("error", function(err) {
      console.error("[AMQP] channel error", err.message);
    });
    ch.on("close", function() {
      console.log("[AMQP] channel closed");
    });

    pubChannel = ch;
    while (true) {
      var m = offlinePubQueue.shift();
			console.log('M = ', m);
      if (!m) break;
      publish(m[0], m[1], m[2]);
    }
  });
}

// method to publish a message, will queue messages internally if the connection is down and resend later
function publish(exchange, routingKey, content) {
  try {
    pubChannel.publish(exchange, routingKey, content, { persistent: true },
                       function(err, ok) {
                         if (err) {
                           console.error("[AMQP] publish", err);
                           offlinePubQueue.push([exchange, routingKey, content]);
                           pubChannel.connection.close();
                         }
                       });
  } catch (e) {
    console.error("[AMQP] publish", e.message);
    offlinePubQueue.push([exchange, routingKey, content]);
  }
}

function closeOnErr(err) {
  if (!err) return false;
  console.error("[AMQP] error", err);
  amqpConn.close();
  return true;
}

setTimeout(function() {
  publish("", "jobs", new Buffer("work work work"));
}, 3000);

start();

Here is the log received from worker.js:

[AMQP] connected
Worker is started
### Message Received ###
Msg [deliveryTag=1] arrived at 2017-07-13T16:17:37.331Z
Got msg work work work
### RABBITMQ server restarted before we could send ack ###
[AMQP] channel closed
[AMQP] reconnecting

### Server Connected ###
[AMQP] connected
Worker is started

### Same message received again ###
Msg [deliveryTag=1] arrived at 2017-07-13T16:17:46.285Z
Got msg work work work

### Error while sending ack now (For message received before reconnecting ###
Sending Ack for msg at time 2017-07-13T16:17:37.331Z
[AMQP] error { IllegalOperationError: Channel closed
    at Channel.<anonymous> (/test/node_modules/amqplib/lib/channel.js:149:11)
    at Channel.ack (/test/node_modules/amqplib/lib/callback_model.js:234:8)
    at /test/worker.js:57:16
    at Timeout.setTimeout (/test/worker.js:70:20)
    at ontimeout (timers.js:380:14)
    at tryOnTimeout (timers.js:244:5)
    at Timer.listOnTimeout (timers.js:214:5)
  message: 'Channel closed',
  stack: 'IllegalOperationError: Channel closed\n    at Channel.<anonymous> (/test/node_modules/amqplib/lib/channel.js:149:11)\n    at Channel.ack (/test/node_modules/amqplib/lib/callback_model.js:234:8)\n    at /test/worker.js:57:16\n    at Timeout.setTimeout (/test/worker.js:70:20)\n    at ontimeout (timers.js:380:14)\n    at tryOnTimeout (timers.js:244:5)\n    at Timer.listOnTimeout (timers.js:214:5)',
  stackAtStateChange: 'Stack capture: Connection closed: 320 (CONNECTION-FORCED) with message "CONNECTION_FORCED - broker forced connection closure with reason \'shutdown\'"\n    at Object.accept (/test/node_modules/amqplib/lib/connection.js:89:15)\n    at Connection.mainAccept [as accept] (/test/node_modules/amqplib/lib/connection.js:63:33)\n    at Socket.go (/test/node_modules/amqplib/lib/connection.js:476:48)\n    at emitNone (events.js:86:13)\n    at Socket.emit (events.js:185:7)\n    at emitReadable_ (_stream_readable.js:432:10)\n    at emitReadable (_stream_readable.js:426:7)\n    at readableAddChunk (_stream_readable.js:187:13)\n    at Socket.Readable.push (_stream_readable.js:134:10)\n    at TCP.onread (net.js:551:20)' }
[AMQP] channel closed
[AMQP] reconnecting

Any ideas on how can we handle this scenario?

I tried running above snippet by dividing them in two separate files in worker.js and producer.js.
Notable changes I've done in worker.js is to change cb(true) to setTimeout(function() { cb(true) }, 12000), to mimic a some time consuming processing.
Also producer.js changed to publish just one message at a time.
Now problem comes when rabbitmq server restarts after worker receives the message and before worker sends the ack for the message it reconnects to the server. After reconnecting worker receives the same message from server while current message is being prepared to send ack.

Here is the snippet of test files:
worker.js

/**
* worker.js
*/
var amqp = require('amqplib/callback_api');
process.env.CLOUDAMQP_URL = 'amqp://localhost';

// if the connection is closed or fails to be established at all, we will reconnect
var amqpConn = null;
function start() {
  amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) {
    if (err) {
      console.error("[AMQP]", err.message);
      return setTimeout(start, 7000);
    }
    conn.on("error", function(err) {
      if (err.message !== "Connection closing") {
        console.error("[AMQP] conn error", err.message);
      }
    });
    conn.on("close", function() {
      console.error("[AMQP] reconnecting");
      return setTimeout(start, 7000);
    });

    console.log("[AMQP] connected");
    amqpConn = conn;

    whenConnected();
  });
}

function whenConnected() {
  startWorker();
}

// A worker that acks messages only if processed succesfully
function startWorker() {
  amqpConn.createChannel(function(err, ch) {
    if (closeOnErr(err)) return;
    ch.on("error", function(err) {
      console.error("[AMQP] channel error", err.message);
    });
    ch.on("close", function() {
      console.log("[AMQP] channel closed");
    });
    ch.prefetch(10);
    ch.assertQueue("jobs", { durable: true }, function(err, _ok) {
      if (closeOnErr(err)) return;
      ch.consume("jobs", processMsg, { noAck: false });
      console.log("Worker is started");
    });

    function processMsg(msg) {
      var incomingDate = (new Date()).toISOString();
      console.log("Msg [deliveryTag=" + msg.fields.deliveryTag + "] arrived at " + incomingDate);
      work(msg, function(ok) {
        console.log("Sending Ack for msg at time " + incomingDate);
        try {
          if (ok)
            ch.ack(msg);
          else
            ch.reject(msg, true);
        } catch (e) {
          closeOnErr(e);
        }
      });
    }
  });
}

function work(msg, cb) {
  console.log("Got msg", msg.content.toString());
  setTimeout(() => cb(true), process.env.WORK_WAIT_TIME || 12000);
}

function closeOnErr(err) {
  if (!err) return false;
  console.error("[AMQP] error", err);
  amqpConn.close();
  return true;
}

start();

producer.js

/**
* producer.js
**/
var amqp = require('amqplib/callback_api');
process.env.CLOUDAMQP_URL = 'amqp://localhost';

// if the connection is closed or fails to be established at all, we will reconnect
var amqpConn = null;
function start() {
  amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) {
    if (err) {
      console.error("[AMQP]", err.message);
      return setTimeout(start, 7000);
    }
    conn.on("error", function(err) {
      if (err.message !== "Connection closing") {
        console.error("[AMQP] conn error", err.message);
      }
    });
    conn.on("close", function() {
      console.error("[AMQP] reconnecting");
      return setTimeout(start, 7000);
    });

    console.log("[AMQP] connected");
    amqpConn = conn;

    whenConnected();
  });
}

function whenConnected() {
  startPublisher();
}

var pubChannel = null;
var offlinePubQueue = [];
function startPublisher() {
  amqpConn.createConfirmChannel(function(err, ch) {
    if (closeOnErr(err)) return;
    ch.on("error", function(err) {
      console.error("[AMQP] channel error", err.message);
    });
    ch.on("close", function() {
      console.log("[AMQP] channel closed");
    });

    pubChannel = ch;
    while (true) {
      var m = offlinePubQueue.shift();
			console.log('M = ', m);
      if (!m) break;
      publish(m[0], m[1], m[2]);
    }
  });
}

// method to publish a message, will queue messages internally if the connection is down and resend later
function publish(exchange, routingKey, content) {
  try {
    pubChannel.publish(exchange, routingKey, content, { persistent: true },
                       function(err, ok) {
                         if (err) {
                           console.error("[AMQP] publish", err);
                           offlinePubQueue.push([exchange, routingKey, content]);
                           pubChannel.connection.close();
                         }
                       });
  } catch (e) {
    console.error("[AMQP] publish", e.message);
    offlinePubQueue.push([exchange, routingKey, content]);
  }
}

function closeOnErr(err) {
  if (!err) return false;
  console.error("[AMQP] error", err);
  amqpConn.close();
  return true;
}

setTimeout(function() {
  publish("", "jobs", new Buffer("work work work"));
}, 3000);

start();

Here is the log received from worker.js:

[AMQP] connected
Worker is started
### Message Received ###
Msg [deliveryTag=1] arrived at 2017-07-13T16:17:37.331Z
Got msg work work work
### RABBITMQ server restarted before we could send ack ###
[AMQP] channel closed
[AMQP] reconnecting

### Server Connected ###
[AMQP] connected
Worker is started

### Same message received again ###
Msg [deliveryTag=1] arrived at 2017-07-13T16:17:46.285Z
Got msg work work work

### Error while sending ack now (For message received before reconnecting ###
Sending Ack for msg at time 2017-07-13T16:17:37.331Z
[AMQP] error { IllegalOperationError: Channel closed
    at Channel.<anonymous> (/test/node_modules/amqplib/lib/channel.js:149:11)
    at Channel.ack (/test/node_modules/amqplib/lib/callback_model.js:234:8)
    at /test/worker.js:57:16
    at Timeout.setTimeout (/test/worker.js:70:20)
    at ontimeout (timers.js:380:14)
    at tryOnTimeout (timers.js:244:5)
    at Timer.listOnTimeout (timers.js:214:5)
  message: 'Channel closed',
  stack: 'IllegalOperationError: Channel closed\n    at Channel.<anonymous> (/test/node_modules/amqplib/lib/channel.js:149:11)\n    at Channel.ack (/test/node_modules/amqplib/lib/callback_model.js:234:8)\n    at /test/worker.js:57:16\n    at Timeout.setTimeout (/test/worker.js:70:20)\n    at ontimeout (timers.js:380:14)\n    at tryOnTimeout (timers.js:244:5)\n    at Timer.listOnTimeout (timers.js:214:5)',
  stackAtStateChange: 'Stack capture: Connection closed: 320 (CONNECTION-FORCED) with message "CONNECTION_FORCED - broker forced connection closure with reason \'shutdown\'"\n    at Object.accept (/test/node_modules/amqplib/lib/connection.js:89:15)\n    at Connection.mainAccept [as accept] (/test/node_modules/amqplib/lib/connection.js:63:33)\n    at Socket.go (/test/node_modules/amqplib/lib/connection.js:476:48)\n    at emitNone (events.js:86:13)\n    at Socket.emit (events.js:185:7)\n    at emitReadable_ (_stream_readable.js:432:10)\n    at emitReadable (_stream_readable.js:426:7)\n    at readableAddChunk (_stream_readable.js:187:13)\n    at Socket.Readable.push (_stream_readable.js:134:10)\n    at TCP.onread (net.js:551:20)' }
[AMQP] channel closed
[AMQP] reconnecting

Any ideas on how can we handle this scenario?

@srkimir

This comment has been minimized.

Show comment Hide comment
@srkimir

srkimir Oct 24, 2017

What if error or close are emitted on channel(not on connection) object from worker perspective? No need to reconnect?

srkimir commented Oct 24, 2017

What if error or close are emitted on channel(not on connection) object from worker perspective? No need to reconnect?

@kiwenlau

This comment has been minimized.

Show comment Hide comment
@kiwenlau

kiwenlau Apr 13, 2018

Following is my code with Async/Await:

const amqp = require("amqplib");

var connection;

async function connectRabbitMQ()
{
    try
    {
        connection = await amqp.connect("amqp://localhost");
        console.log("connect to RabbitMQ success!");

        // How to reproduce "error" event?
        connection.on("error", function(err)
        {
            logger.error(err);
            setTimeout(connectRabbitMQ, 10000);
        });

        connection.on("close", function()
        {
            logger.error("connection to RabbitQM closed!");
            setTimeout(connectRabbitMQ, 10000);
        });

    }
    catch (err)
    {
        console.error(err);
        setTimeout(connectRabbitMQ, 10000);
    }


}

connectRabbitMQ();

By stopping RabbitMQ container, I can reproduce "close" event, but how can I reproduce "error" event on connection?

kiwenlau commented Apr 13, 2018

Following is my code with Async/Await:

const amqp = require("amqplib");

var connection;

async function connectRabbitMQ()
{
    try
    {
        connection = await amqp.connect("amqp://localhost");
        console.log("connect to RabbitMQ success!");

        // How to reproduce "error" event?
        connection.on("error", function(err)
        {
            logger.error(err);
            setTimeout(connectRabbitMQ, 10000);
        });

        connection.on("close", function()
        {
            logger.error("connection to RabbitQM closed!");
            setTimeout(connectRabbitMQ, 10000);
        });

    }
    catch (err)
    {
        console.error(err);
        setTimeout(connectRabbitMQ, 10000);
    }


}

connectRabbitMQ();

By stopping RabbitMQ container, I can reproduce "close" event, but how can I reproduce "error" event on connection?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment