Skip to content

Instantly share code, notes, and snippets.

@boogie
Created May 21, 2017 12:25
Show Gist options
  • Save boogie/dbb8ca7e9f3168347d31cb0f082732ef to your computer and use it in GitHub Desktop.
Save boogie/dbb8ca7e9f3168347d31cb0f082732ef to your computer and use it in GitHub Desktop.
BETTER implementation with a "processing counter": RabbitMQ client closes the connection after 2 secs
#!/usr/bin/env node
'use strict';
const amqp = require('amqplib');
const sleep = async (ms) => new Promise(resolve => setTimeout(resolve, ms));
(async () => {
let processing = 0;
const conn = await amqp.connect('amqp://localhost:9520');
const ch = await conn.createChannel();
await ch.assertQueue('hello', { durable: false });
await ch.prefetch(1);
const consumer = await ch.consume('hello', async (msg) => {
processing++;
console.log(" [x] Received '%s'", msg.content.toString());
await sleep(500);
try {
ch.ack(msg);
processing--;
} catch (err) {
console.log('ch.ack(msg)', err.message);
}
});
const closeChannel = async () => {
if (processing) {
return setTimeout(closeChannel, 100);
}
await ch.close();
await conn.close();
console.log(' [*] Finished');
};
const closeAndCancelChannel = async () => {
await ch.cancel(consumer.consumerTag);
closeChannel();
};
process.once('SIGINT', closeAndCancelChannel);
setTimeout(closeAndCancelChannel, 2000);
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment