Skip to content

Instantly share code, notes, and snippets.

@GavinDmello
Last active May 13, 2016 08:20
Show Gist options
  • Save GavinDmello/d614940e0f89383f3f471b90ec121360 to your computer and use it in GitHub Desktop.
Save GavinDmello/d614940e0f89383f3f471b90ec121360 to your computer and use it in GitHub Desktop.
This is the test client that we are trying to run.
//Sender client
/*
The sample command to run is this :
node --expose-gc filname.js 1 2 20000
So,
the process.argv[2] = lower limit of CPU.
the process.argv[3] = upper limit of CPU.
the process.argv[4] = Number of messages.
*/
var cluster = require('cluster');
var lowerLimitCPUS = Number(process.argv[2]);
var higherLimit = Number(process.argv[3]);
var CPUS = higherLimit;
var COUNT = process.argv[4];
var latency = {
'0-2': 0,
'3-5': 0,
'6-20': 0,
'21-50': 0,
'51-100': 0,
'101-200': 0,
'201+': 0
}
if (cluster.isMaster) {
var data = [];
var total_messages = 0;
var total_latency = {
'0-2': 0,
'3-5': 0,
'6-20': 0,
'21-50': 0,
'51-100': 0,
'101-200': 0,
'201-300': 0,
'301-400': 0,
'401-500': 0,
'501-600': 0,
'601-700': 0,
'701-800': 0,
'801-900': 0,
'901-1000': 0,
'1001+': 0
};
var workers = [];
var workerCount = 0;
cluster.on('message', function(msg) {
if (msg.done) {
workerCount++;
if (workerCount === higherLimit - lowerLimitCPUS) {
for (var i = lowerLimitCPUS; i < higherLimit; i++) {
workers[i].send({ start: "start" });
}
}
} else {
data.push(msg);
console.log(msg);
console.log("============ CHILD -> " + data.length)
if (data.length == CPUS) {
for (var i = 0; i < data.length; i++) {
total_latency['0-2'] += data[i]['0-2'];
total_latency['3-5'] += data[i]['3-5'];
total_latency['6-20'] += data[i]['6-20'];
total_latency['21-50'] += data[i]['21-50'];
total_latency['51-100'] += data[i]['51-100'];
total_latency['101-200'] += data[i]['101-200'];
total_latency['201-300'] += data[i]['201-300'];
total_latency['301-400'] += data[i]['301-400'];
total_latency['401-500'] += data[i]['401-500'];
total_latency['501-600'] += data[i]['501-600'];
total_latency['601-700'] += data[i]['601-700'];
total_latency['701-800'] += data[i]['701-800'];
total_latency['801-900'] += data[i]['801-900'];
total_latency['901-1000'] += data[i]['901-1000'];
total_latency['1001+'] += data[i]['1001+'];
}
for (var key in total_latency) {
total_messages += total_latency[key];
}
console.log("============ ");
console.log("============ TOTAL");
for (key in total_latency) {
if (total_latency.hasOwnProperty(key)) {
total_latency[key] = ((total_latency[key] * 100) / total_messages);
}
}
console.log(total_latency); // In percentage
}
}
});
for (var i = lowerLimitCPUS; i < higherLimit; i++) {
console.log('forking');
workers[i] = cluster.fork();
workers[i].send({ connect: i });
}
} else {
var mqttClient;
var id;
var i = 1;
process.on('message', function(msg) {
var hosts = {
SG: "52.33.239.94",
US: "52.76.233.197",
LOCAL: '127.0.0.1',
TEST: "test.mosquitto.org"
};
var currentTime = Date.now();
var host = hosts[process.env.HOST] || hosts.LOCAL;
var mqtt = require('mqtt');
var latency = {
'0-2': 0,
'3-5': 0,
'6-20': 0,
'21-50': 0,
'51-100': 0,
'101-200': 0,
'201-300': 0,
'301-400': 0,
'401-500': 0,
'501-600': 0,
'601-700': 0,
'701-800': 0,
'801-900': 0,
'901-1000': 0,
'1001+': 0
};
if (msg.start) {
if (COUNT == '1') {
console.log("here");
send('end');
} else {
send();
}
} else if (msg.connect) {
id = msg.connect;
mqttClient = mqtt.connect({
port: "your port here",
host: "your host here",
clean: true,
clientId: (9000000000 + msg.connect).toString(),
password: msg.connect + '-' + msg.connect
});
mqttClient.setMaxListeners(0);
mqttClient.on('connect', function(connack) {
var diff = Date.now() - currentTime;
if (diff >= 0 && diff <= 2) {
latency['0-2']++;
} else if (diff >= 3 && diff <= 5) {
latency['3-5']++;
} else if (diff >= 6 && diff <= 20) {
latency['6-20']++;
} else if (diff >= 21 && diff <= 50) {
latency['21-50']++;
} else if (diff >= 51 && diff <= 100) {
latency['51-100']++;
} else if (diff >= 100 && diff <= 200) {
latency['101-200']++;
} else if (diff >= 201 && diff <= 300) {
latency['201-300']++;
} else if (diff >= 301 && diff <= 400) {
latency['301-400']++;
} else if (diff >= 401 && diff <= 500) {
latency['401-500']++;
} else if (diff >= 501 && diff <= 600) {
latency['501-600']++;
} else if (diff >= 601 && diff <= 700) {
latency['601-700']++;
} else if (diff >= 701 && diff <= 800) {
latency['701-800']++;
} else if (diff >= 801 && diff <= 900) {
latency['801-900']++;
} else if (diff >= 901 && diff <= 1000) {
latency['901-1000']++;
} else if (diff >= 1001) {
latency['1001+']++;
}
process.send({ done: "done" });
});
}
function send(end) {
var payload = JSON.stringify({
messageType: 2,
count: i,
timestamp: Date.now(),
receiverId: (8000000000 + id).toString(),
message: 'dummy message ' + i,
end: end || false
});
mqttClient.publish("/publish-private", payload, { qos: 1, retain: false }, function(cb, cb2) {});
if (end) {
return;
}
console.log(i, COUNT, "asdjfhsadfsadfsadf");
if (++i <= COUNT) {
if (i == COUNT) {
console.log("came in here")
send('end');
} else {
console.log("dsfdsafasdf");
// setTimeout(send, 5000);
process.nextTick(send);
}
}
}
mqttClient.on('message', function(topic, message) {
mqttClient.end();
});
});
}
//Receiver client
/*
The sample command to run is this :
node --expose-gc filname.js 1 2
So,
the process.argv[2] = lower limit of CPU.
the process.argv[3] = upper limit of CPU.
*/
global.gc();
var fs = require('fs');
var cluster = require('cluster');
var Stats = require('fast-stats').Stats;
var lowerLimitCPUS = Number(process.argv[2]);
var higherLimit = Number(process.argv[3]);
var CPUS = higherLimit;
var clientId = 8000000000
var workers = [];
var stats;
var statData = [];
if (cluster.isMaster) {
var data = [];
var totsDelay = 0;
var total_messages = 0
var total_latency = {
'0-2': 0,
'3-5': 0,
'6-20': 0,
'21-50': 0,
'51-100': 0,
'101-200': 0,
'201-300': 0,
'301-400': 0,
'401-500': 0,
'501-600': 0,
'601-700': 0,
'701-800': 0,
'801-900': 0,
'901-1000': 0,
'1001+': 0
};
var startTime;
var first = true;
cluster.on('message', function(msg) {
statData.push(msg.totalDelay);
data.push(msg);
console.log("================>", msg)
//data.push(msg);
console.log("============ CHILD -> " + data.length)
console.log(data.length, (higherLimit - lowerLimitCPUS));
if (data.length % (higherLimit - lowerLimitCPUS) === 0) {
for (var i = 0; i < data.length; i++) {
total_latency['0-2'] += data[i]['0-2'];
total_latency['3-5'] += data[i]['3-5'];
total_latency['6-20'] += data[i]['6-20'];
total_latency['21-50'] += data[i]['21-50'];
total_latency['51-100'] += data[i]['51-100'];
total_latency['101-200'] += data[i]['101-200'];
total_latency['201-300'] += data[i]['201-300'];
total_latency['301-400'] += data[i]['301-400'];
total_latency['401-500'] += data[i]['401-500'];
total_latency['501-600'] += data[i]['501-600'];
total_latency['601-700'] += data[i]['601-700'];
total_latency['701-800'] += data[i]['701-800'];
total_latency['801-900'] += data[i]['801-900'];
total_latency['901-1000'] += data[i]['901-1000'];
total_latency['1001+'] += data[i]['1001+'];
}
for (var key in total_latency) {
total_messages += total_latency[key];
}
stats = new Stats().push(statData);
console.log(statData);
var range = stats.range();
console.log("95th percentile ", stats.percentile(95))
console.log("Minimum latency", range[0]);
console.log("Maximum latency", range[1]);
console.log("============ ");
console.log("total_messages", total_messages);
console.log(totsDelay);
console.log("Speed", total_messages / (msg.totalDelay / 1000))
console.log("============ TOTAL");
for (key in total_latency) {
if (total_latency.hasOwnProperty(key)) {
total_latency[key] = ((total_latency[key] * 100) / total_messages);
}
}
console.log(total_latency);
var date = new Date();
fs.appendFileSync('result.txt', "===================================" + "\n" + date.toString() + "\n" + "Latency:" + JSON.stringify(total_latency) + "\n" + "No of clients :" + CPUS + "\n" + "Total no of messages :" + total_messages + "\n" + "====================================");
}
});
for (var i = lowerLimitCPUS; i < higherLimit; i++) {
workers[i] = cluster.fork();
workers[i].send({ id: i })
}
} else {
process.on('message', function(msg) {
if (msg.id) {
var hosts = {
LOCAL: '127.0.0.1',
TEST: "test.mosquitto.org"
};
var host = hosts[process.env.HOST] || hosts.LOCAL;
var mqtt = require('mqtt');
var mqttClient;
mqttClient = mqtt.connect({
port: "your port here",
host: "Your host here",
clean: false,
clientId: (8000000000 + msg.id).toString(),
password: msg.id + '-' + msg.id
});
mqttClient.setMaxListeners(0);
var count = 0
var testSeq = function() {
var _seq = {};
for (var i = 0; i < 10; i++) {
_seq[i] = true;
}
return _seq;
}();
var seq = 0;
var startTime = 0;
var latency = {
'0-2': 0,
'3-5': 0,
'6-20': 0,
'21-50': 0,
'51-100': 0,
'101-200': 0,
'201-300': 0,
'301-400': 0,
'401-500': 0,
'501-600': 0,
'601-700': 0,
'701-800': 0,
'801-900': 0,
'901-1000': 0,
'1001+': 0
};
var startTime, first = true;
mqttClient.on('message', function(topic, message, packet) {
if (first) {
startTime = message.timestamp;
first = false;
}
console.log("message received", message.toString())
message = JSON.parse(message);
if (message.end) {
latency.totalDelay = Date.now() - startTime;
}
console.log(Date.now(), message.timestamp);
var delay = Date.now() - message.timestamp;
console.log('delay: ', delay, ' ', message.count)
if (seq == 0) {
startTime = message.timestamp;
seq++;
}
for (key in latency) {
if (delay >= key.split('-')[0] && delay <= key.split('-')[1]) {
latency[key]++;
break;
}
if (key.split('+').length > 1) {
if (delay >= key.split('+')[0]) {
latency[key]++;
break;
}
}
}
if (message.end) {
end();
}
});
function end() {
process.send(latency);
startTime = 0;
seq = 0;
}
}
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment