Skip to content

Instantly share code, notes, and snippets.

@gustavopaes
Created January 28, 2017 20:00
Show Gist options
  • Save gustavopaes/3116a4c4da684b989e0d3dc5f03563ce to your computer and use it in GitHub Desktop.
Save gustavopaes/3116a4c4da684b989e0d3dc5f03563ce to your computer and use it in GitHub Desktop.
simple rabbitmq benchmark
const amqplib = require('amqplib');
const connect = amqplib.connect('amqp://172.17.0.2:5672');
const QUEUE_NAME = 'benchmark';
const TOTAL_MESSAGES = process.argv[2] || 10;
let SEND_MESSAGES = 0;
let RECEIVED_MESSAGES = 0;
let sendingStartTime;
let consumerStartTime;
const log = () => {
process.stdout.write(`Sending messages: ${SEND_MESSAGES}/${TOTAL_MESSAGES} (${(new Date() - sendingStartTime) / 1000}s); Receiving messages: ${RECEIVED_MESSAGES}/${TOTAL_MESSAGES} (${(new Date() - consumerStartTime) / 1000}s)\r`);
}
const getSampleMessage = () => {
return JSON.stringify({
id: new Date().getTime(),
message: 'benchamrk message',
action: 'YOU_CAN_DO_IT'
});
};
const createChannel = conn => {
return conn.createConfirmChannel();
};
const assertQueue = channel => {
return channel.assertQueue(QUEUE_NAME).then(ok => {
return Promise.resolve(channel);
});
};
const consume = channel => {
if(consumerStartTime == undefined) {
consumerStartTime = new Date();
}
channel.prefetch(10);
channel.consume(QUEUE_NAME, message => {
RECEIVED_MESSAGES += 1;
log();
setTimeout(function() {
channel.ack(message);
}, Math.max(300, parseInt(Math.random() * 1000)));
if(RECEIVED_MESSAGES == TOTAL_MESSAGES) {
process.exit(0);
}
});
return Promise.resolve(channel);
};
const publish = channel => {
if(sendingStartTime == undefined) {
sendingStartTime = new Date();
const publishWorkers = 10;
console.log(`Starting ${publishWorkers} publish workers...`);
for(let i = 0; i < publishWorkers; i++) {
publish(channel);
}
return Promise.resolve(channel);
}
if(SEND_MESSAGES >= TOTAL_MESSAGES) {
return Promise.resolve(channel);
}
SEND_MESSAGES += 1;
channel.sendToQueue(QUEUE_NAME, new Buffer(getSampleMessage()), {}, () => {
log();
if(SEND_MESSAGES < TOTAL_MESSAGES) {
publish(channel);
}
});
};
console.log(`Starting test, publishing ${TOTAL_MESSAGES} messages...`);
connect
.then(createChannel)
.then(assertQueue)
.then(publish)
.then(consume)
.catch(err => {
console.error('Error during the test:');
console.error(`\t${err.message}`);
switch(err.code) {
case 'ECONNREFUSED':
console.error(`\tCertificate to up the rabbitmq service at localhost:5672`);
break;
}
console.log('');
process.exit(1);
});
{
"name": "mq-benchmark",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "Gustavo Paes <gustavo.paes@gmail.com>",
"license": "ISC",
"dependencies": {
"amqplib": "^0.5.1"
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment