Skip to content

Instantly share code, notes, and snippets.

@aheckmann
Last active December 30, 2015 02:19
Show Gist options
  • Save aheckmann/7762370 to your computer and use it in GitHub Desktop.
Save aheckmann/7762370 to your computer and use it in GitHub Desktop.
bottleneck in amqp.js when publishing large numbers of messages using confirms. see README.md

run this with > time DEBUG=amqp:* COUNT=30000 PREFETCH=0 BUCKET_SIZE=20000 node index.js

before applying the fix:

real  1m22.216s
user  1m10.973s
sys 0m16.106s

after:

real  0m25.034s
user  0m14.082s
sys 0m15.153s
var amqp = require('amqp');
var PREFETCH = parseInt(process.env.PREFETCH, 10) || 1;
var COUNT = process.env.COUNT ? parseInt(process.env.COUNT, 10) : 30000;
var exchangeName = 'helloo';
var exchangeOpts = {
type: 'direct'
, passive: false
, durable: true
, autoDelete: false
, confirm: true
}
function consume () {
var debug = require('debug')('amqp:consume');
debug('connecting to rabbit');
var conn = amqp.createConnection();
'connect open error'.split(' ').forEach(function (evt) {
conn.on(evt, function () {
debug(evt, arguments);
})
});
conn.once('ready', function () {
debug('ready');
conn.exchange(exchangeName, exchangeOpts, function (ex) {
conn.queue('hello-queue', function (q) {
q.bind(ex, 'hola');
q.on('queueBindOk', function () {
debug('queueBindOk');
var opts = {};
opts.ack = true;
opts.prefetchCount = PREFETCH;
var i = 0;
q.subscribe(opts, function (msg, headers, info) {
++i;
if (('testing '+COUNT) == msg.msg) {
debug('closing connection');
conn.end();
return;
}
if (0 === i%1000) debug('got messages', i);
q.shift();
});
});
debug('waiting for `queueBindOk`');
process.send({ ready: true });
});
});
});
}
consume();
var cp = require('child_process');
var DEBUG = process.env.DEBUG;
var PREFETCH = parseInt(process.env.PREFETCH, 10) || 1;
var COUNT = process.env.COUNT ? parseInt(process.env.COUNT, 10) : 30000;
var consumer = cp.fork(__dirname + '/consumer.js');
consumer.on('message', function (msg) {
if (true !== msg.ready) {
console.log('unknown message', msg);
consumer.kill();
return;
}
var producer = cp.fork(__dirname + '/producer.js');
});
console.log('waiting for consumer ready');
{
"name": "rabbit",
"private": true,
"version": "0.0.0",
"description": "",
"main": "index.js",
"dependencies": {
"amqp": "~0.1.8",
"debug": "~0.7.4"
},
"devDependencies": {},
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "BSD-2-Clause"
}
var amqp = require('amqp');
var PREFETCH = parseInt(process.env.PREFETCH, 10) || 1;
var COUNT = process.env.COUNT ? parseInt(process.env.COUNT, 10) : 30000;
var BUCKET_SIZE = process.env.BUCKET_SIZE ? parseInt(process.env.BUCKET_SIZE, 10) : 30000;
var TIMEOUT = 1;
var exchangeOpts = {
type: 'direct'
, passive: false
, durable: true
, autoDelete: false
, confirm: true // causes publish cbs to fire upon confirm
}
var exchangeName = 'helloo';
function produce () {
var debug = require('debug')('amqp:produce');
var conn = amqp.createConnection();
'connect open error'.split(' ').forEach(function (evt) {
conn.on(evt, function () {
debug(evt, arguments);
})
});
conn.once('ready', function () {
debug('producer ready');
conn.exchange(exchangeName, exchangeOpts, function (ex) {
var iteration = 0;
publish(iteration);
function publish (iteration) {
debug('iteration', iteration, COUNT+1, BUCKET_SIZE);
var start = iteration * BUCKET_SIZE;
if (start >= COUNT+1) {
debug('finishing up');
setTimeout(function () {
debug('closing connection');
conn.end();
},500)
return;
}
var end = Math.min(COUNT+1, start+BUCKET_SIZE);
for (var i = start; i < end; ++i) {
var msg = {msg:'testing '+i};
if (0 === i%1000) debug('publishing', msg);
ex.publish('hola', msg, {}, (function(i){ return function (failed) {
if (0 === i%1000) {
debug('exchange published sucessfully?', false === failed);
}
}})(i));
}
iteration++;
setTimeout(function(){
publish(iteration);
}, TIMEOUT);
}
});
});
}
produce();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment