Last active
May 13, 2016 08:20
-
-
Save GavinDmello/d614940e0f89383f3f471b90ec121360 to your computer and use it in GitHub Desktop.
This is the test client that we are trying to run.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//Sender client | |
/* | |
The sample command to run is this : | |
node --expose-gc filname.js 1 2 20000 | |
So, | |
the process.argv[2] = lower limit of CPU. | |
the process.argv[3] = upper limit of CPU. | |
the process.argv[4] = Number of messages. | |
*/ | |
var cluster = require('cluster'); | |
var lowerLimitCPUS = Number(process.argv[2]); | |
var higherLimit = Number(process.argv[3]); | |
var CPUS = higherLimit; | |
var COUNT = process.argv[4]; | |
var latency = { | |
'0-2': 0, | |
'3-5': 0, | |
'6-20': 0, | |
'21-50': 0, | |
'51-100': 0, | |
'101-200': 0, | |
'201+': 0 | |
} | |
if (cluster.isMaster) { | |
var data = []; | |
var total_messages = 0; | |
var total_latency = { | |
'0-2': 0, | |
'3-5': 0, | |
'6-20': 0, | |
'21-50': 0, | |
'51-100': 0, | |
'101-200': 0, | |
'201-300': 0, | |
'301-400': 0, | |
'401-500': 0, | |
'501-600': 0, | |
'601-700': 0, | |
'701-800': 0, | |
'801-900': 0, | |
'901-1000': 0, | |
'1001+': 0 | |
}; | |
var workers = []; | |
var workerCount = 0; | |
cluster.on('message', function(msg) { | |
if (msg.done) { | |
workerCount++; | |
if (workerCount === higherLimit - lowerLimitCPUS) { | |
for (var i = lowerLimitCPUS; i < higherLimit; i++) { | |
workers[i].send({ start: "start" }); | |
} | |
} | |
} else { | |
data.push(msg); | |
console.log(msg); | |
console.log("============ CHILD -> " + data.length) | |
if (data.length == CPUS) { | |
for (var i = 0; i < data.length; i++) { | |
total_latency['0-2'] += data[i]['0-2']; | |
total_latency['3-5'] += data[i]['3-5']; | |
total_latency['6-20'] += data[i]['6-20']; | |
total_latency['21-50'] += data[i]['21-50']; | |
total_latency['51-100'] += data[i]['51-100']; | |
total_latency['101-200'] += data[i]['101-200']; | |
total_latency['201-300'] += data[i]['201-300']; | |
total_latency['301-400'] += data[i]['301-400']; | |
total_latency['401-500'] += data[i]['401-500']; | |
total_latency['501-600'] += data[i]['501-600']; | |
total_latency['601-700'] += data[i]['601-700']; | |
total_latency['701-800'] += data[i]['701-800']; | |
total_latency['801-900'] += data[i]['801-900']; | |
total_latency['901-1000'] += data[i]['901-1000']; | |
total_latency['1001+'] += data[i]['1001+']; | |
} | |
for (var key in total_latency) { | |
total_messages += total_latency[key]; | |
} | |
console.log("============ "); | |
console.log("============ TOTAL"); | |
for (key in total_latency) { | |
if (total_latency.hasOwnProperty(key)) { | |
total_latency[key] = ((total_latency[key] * 100) / total_messages); | |
} | |
} | |
console.log(total_latency); // In percentage | |
} | |
} | |
}); | |
for (var i = lowerLimitCPUS; i < higherLimit; i++) { | |
console.log('forking'); | |
workers[i] = cluster.fork(); | |
workers[i].send({ connect: i }); | |
} | |
} else { | |
var mqttClient; | |
var id; | |
var i = 1; | |
process.on('message', function(msg) { | |
var hosts = { | |
SG: "52.33.239.94", | |
US: "52.76.233.197", | |
LOCAL: '127.0.0.1', | |
TEST: "test.mosquitto.org" | |
}; | |
var currentTime = Date.now(); | |
var host = hosts[process.env.HOST] || hosts.LOCAL; | |
var mqtt = require('mqtt'); | |
var latency = { | |
'0-2': 0, | |
'3-5': 0, | |
'6-20': 0, | |
'21-50': 0, | |
'51-100': 0, | |
'101-200': 0, | |
'201-300': 0, | |
'301-400': 0, | |
'401-500': 0, | |
'501-600': 0, | |
'601-700': 0, | |
'701-800': 0, | |
'801-900': 0, | |
'901-1000': 0, | |
'1001+': 0 | |
}; | |
if (msg.start) { | |
if (COUNT == '1') { | |
console.log("here"); | |
send('end'); | |
} else { | |
send(); | |
} | |
} else if (msg.connect) { | |
id = msg.connect; | |
mqttClient = mqtt.connect({ | |
port: "your port here", | |
host: "your host here", | |
clean: true, | |
clientId: (9000000000 + msg.connect).toString(), | |
password: msg.connect + '-' + msg.connect | |
}); | |
mqttClient.setMaxListeners(0); | |
mqttClient.on('connect', function(connack) { | |
var diff = Date.now() - currentTime; | |
if (diff >= 0 && diff <= 2) { | |
latency['0-2']++; | |
} else if (diff >= 3 && diff <= 5) { | |
latency['3-5']++; | |
} else if (diff >= 6 && diff <= 20) { | |
latency['6-20']++; | |
} else if (diff >= 21 && diff <= 50) { | |
latency['21-50']++; | |
} else if (diff >= 51 && diff <= 100) { | |
latency['51-100']++; | |
} else if (diff >= 100 && diff <= 200) { | |
latency['101-200']++; | |
} else if (diff >= 201 && diff <= 300) { | |
latency['201-300']++; | |
} else if (diff >= 301 && diff <= 400) { | |
latency['301-400']++; | |
} else if (diff >= 401 && diff <= 500) { | |
latency['401-500']++; | |
} else if (diff >= 501 && diff <= 600) { | |
latency['501-600']++; | |
} else if (diff >= 601 && diff <= 700) { | |
latency['601-700']++; | |
} else if (diff >= 701 && diff <= 800) { | |
latency['701-800']++; | |
} else if (diff >= 801 && diff <= 900) { | |
latency['801-900']++; | |
} else if (diff >= 901 && diff <= 1000) { | |
latency['901-1000']++; | |
} else if (diff >= 1001) { | |
latency['1001+']++; | |
} | |
process.send({ done: "done" }); | |
}); | |
} | |
function send(end) { | |
var payload = JSON.stringify({ | |
messageType: 2, | |
count: i, | |
timestamp: Date.now(), | |
receiverId: (8000000000 + id).toString(), | |
message: 'dummy message ' + i, | |
end: end || false | |
}); | |
mqttClient.publish("/publish-private", payload, { qos: 1, retain: false }, function(cb, cb2) {}); | |
if (end) { | |
return; | |
} | |
console.log(i, COUNT, "asdjfhsadfsadfsadf"); | |
if (++i <= COUNT) { | |
if (i == COUNT) { | |
console.log("came in here") | |
send('end'); | |
} else { | |
console.log("dsfdsafasdf"); | |
// setTimeout(send, 5000); | |
process.nextTick(send); | |
} | |
} | |
} | |
mqttClient.on('message', function(topic, message) { | |
mqttClient.end(); | |
}); | |
}); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//Receiver client | |
/* | |
The sample command to run is this : | |
node --expose-gc filname.js 1 2 | |
So, | |
the process.argv[2] = lower limit of CPU. | |
the process.argv[3] = upper limit of CPU. | |
*/ | |
global.gc(); | |
var fs = require('fs'); | |
var cluster = require('cluster'); | |
var Stats = require('fast-stats').Stats; | |
var lowerLimitCPUS = Number(process.argv[2]); | |
var higherLimit = Number(process.argv[3]); | |
var CPUS = higherLimit; | |
var clientId = 8000000000 | |
var workers = []; | |
var stats; | |
var statData = []; | |
if (cluster.isMaster) { | |
var data = []; | |
var totsDelay = 0; | |
var total_messages = 0 | |
var total_latency = { | |
'0-2': 0, | |
'3-5': 0, | |
'6-20': 0, | |
'21-50': 0, | |
'51-100': 0, | |
'101-200': 0, | |
'201-300': 0, | |
'301-400': 0, | |
'401-500': 0, | |
'501-600': 0, | |
'601-700': 0, | |
'701-800': 0, | |
'801-900': 0, | |
'901-1000': 0, | |
'1001+': 0 | |
}; | |
var startTime; | |
var first = true; | |
cluster.on('message', function(msg) { | |
statData.push(msg.totalDelay); | |
data.push(msg); | |
console.log("================>", msg) | |
//data.push(msg); | |
console.log("============ CHILD -> " + data.length) | |
console.log(data.length, (higherLimit - lowerLimitCPUS)); | |
if (data.length % (higherLimit - lowerLimitCPUS) === 0) { | |
for (var i = 0; i < data.length; i++) { | |
total_latency['0-2'] += data[i]['0-2']; | |
total_latency['3-5'] += data[i]['3-5']; | |
total_latency['6-20'] += data[i]['6-20']; | |
total_latency['21-50'] += data[i]['21-50']; | |
total_latency['51-100'] += data[i]['51-100']; | |
total_latency['101-200'] += data[i]['101-200']; | |
total_latency['201-300'] += data[i]['201-300']; | |
total_latency['301-400'] += data[i]['301-400']; | |
total_latency['401-500'] += data[i]['401-500']; | |
total_latency['501-600'] += data[i]['501-600']; | |
total_latency['601-700'] += data[i]['601-700']; | |
total_latency['701-800'] += data[i]['701-800']; | |
total_latency['801-900'] += data[i]['801-900']; | |
total_latency['901-1000'] += data[i]['901-1000']; | |
total_latency['1001+'] += data[i]['1001+']; | |
} | |
for (var key in total_latency) { | |
total_messages += total_latency[key]; | |
} | |
stats = new Stats().push(statData); | |
console.log(statData); | |
var range = stats.range(); | |
console.log("95th percentile ", stats.percentile(95)) | |
console.log("Minimum latency", range[0]); | |
console.log("Maximum latency", range[1]); | |
console.log("============ "); | |
console.log("total_messages", total_messages); | |
console.log(totsDelay); | |
console.log("Speed", total_messages / (msg.totalDelay / 1000)) | |
console.log("============ TOTAL"); | |
for (key in total_latency) { | |
if (total_latency.hasOwnProperty(key)) { | |
total_latency[key] = ((total_latency[key] * 100) / total_messages); | |
} | |
} | |
console.log(total_latency); | |
var date = new Date(); | |
fs.appendFileSync('result.txt', "===================================" + "\n" + date.toString() + "\n" + "Latency:" + JSON.stringify(total_latency) + "\n" + "No of clients :" + CPUS + "\n" + "Total no of messages :" + total_messages + "\n" + "===================================="); | |
} | |
}); | |
for (var i = lowerLimitCPUS; i < higherLimit; i++) { | |
workers[i] = cluster.fork(); | |
workers[i].send({ id: i }) | |
} | |
} else { | |
process.on('message', function(msg) { | |
if (msg.id) { | |
var hosts = { | |
LOCAL: '127.0.0.1', | |
TEST: "test.mosquitto.org" | |
}; | |
var host = hosts[process.env.HOST] || hosts.LOCAL; | |
var mqtt = require('mqtt'); | |
var mqttClient; | |
mqttClient = mqtt.connect({ | |
port: "your port here", | |
host: "Your host here", | |
clean: false, | |
clientId: (8000000000 + msg.id).toString(), | |
password: msg.id + '-' + msg.id | |
}); | |
mqttClient.setMaxListeners(0); | |
var count = 0 | |
var testSeq = function() { | |
var _seq = {}; | |
for (var i = 0; i < 10; i++) { | |
_seq[i] = true; | |
} | |
return _seq; | |
}(); | |
var seq = 0; | |
var startTime = 0; | |
var latency = { | |
'0-2': 0, | |
'3-5': 0, | |
'6-20': 0, | |
'21-50': 0, | |
'51-100': 0, | |
'101-200': 0, | |
'201-300': 0, | |
'301-400': 0, | |
'401-500': 0, | |
'501-600': 0, | |
'601-700': 0, | |
'701-800': 0, | |
'801-900': 0, | |
'901-1000': 0, | |
'1001+': 0 | |
}; | |
var startTime, first = true; | |
mqttClient.on('message', function(topic, message, packet) { | |
if (first) { | |
startTime = message.timestamp; | |
first = false; | |
} | |
console.log("message received", message.toString()) | |
message = JSON.parse(message); | |
if (message.end) { | |
latency.totalDelay = Date.now() - startTime; | |
} | |
console.log(Date.now(), message.timestamp); | |
var delay = Date.now() - message.timestamp; | |
console.log('delay: ', delay, ' ', message.count) | |
if (seq == 0) { | |
startTime = message.timestamp; | |
seq++; | |
} | |
for (key in latency) { | |
if (delay >= key.split('-')[0] && delay <= key.split('-')[1]) { | |
latency[key]++; | |
break; | |
} | |
if (key.split('+').length > 1) { | |
if (delay >= key.split('+')[0]) { | |
latency[key]++; | |
break; | |
} | |
} | |
} | |
if (message.end) { | |
end(); | |
} | |
}); | |
function end() { | |
process.send(latency); | |
startTime = 0; | |
seq = 0; | |
} | |
} | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment