Skip to content

Instantly share code, notes, and snippets.

@brandonros
Created April 29, 2018 03:10
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save brandonros/1918d1a4cc2e5e4698a76ff099b55636 to your computer and use it in GitHub Desktop.
var FJQ = require('featureless-job-queue');
var now = require('performance-now');
var computeRule = require(process.env.LOYALTY_RULES_ENGINE_PATH + '/common/computeRule.js');
async function createJob(fjq, job) {
return new Promise(function(resolve, reject) {
fjq.create(job, function(err, count) {
if (err) {
return reject(err);
}
resolve(count);
});
});
}
async function clearAll(fjq) {
return new Promise(function(resolve, reject) {
fjq.clearAll(function(err) {
if (err) {
return reject(err);
}
resolve();
});
});
}
async function length(fjq) {
return new Promise(function(resolve, reject) {
fjq.length(function(err, count) {
if (err) {
return reject(err);
}
resolve(count);
});
});
}
async function drainQueue(queueIndex) {
var fjq = new FJQ({
redisUrl: 'redis://127.0.0.1:6379',
redisKey: 'fjq:jobs:' + queueIndex
});
var concurrency = 100;
var jobsHandled = 0;
var start;
var highestTps;
var workerFunction = function(job, cb) {
if (!start) {
start = now();
}
//var result = computeRule(job.input, job.rule);
jobsHandled += 1;
cb();
};
fjq.process(workerFunction, concurrency);
setInterval(function() {
var elapsed = now() - start;
var tps = (jobsHandled / elapsed) * 1000;
if (!highestTps || tps > highestTps) {
highestTps = tps;
}
console.log(new Date(), elapsed, jobsHandled, tps, highestTps);
if (jobsHandled === 2000000) {
process.exit(1);
}
}, 1000);
/*fjq.shutdown(function() {
process.exit(0);
});*/
}
(async function() {
var queueIndex = process.argv[2];
await drainQueue(queueIndex);
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment