Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
These are benchmark results of an implementation of async.queue based on maintaining an index into the queue as items are processed instead of shifting items off the queue. The new implementation is shown as "indexed async.queue" in the benchmark results, with the old one shown as "upstream async.queue". For larger queues, the performance differ…
diff --git a/lib/async.js b/lib/async.js
index 394c41c..8d91ab5 100644
--- a/lib/async.js
+++ b/lib/async.js
@@ -832,6 +832,120 @@
return q;
};
+ async.indexedQueue = function (worker, concurrency) {
+ if (concurrency === undefined) {
+ concurrency = 1;
+ }
+ function _insert(q, data, pos, callback) {
+ if (!q.started){
+ q.started = true;
+ }
+ if (!_isArray(data)) {
+ data = [data];
+ }
+ if(data.length == 0) {
+ // call drain immediately if there are no tasks
+ return async.setImmediate(function() {
+ if (q.drain) {
+ q.drain();
+ }
+ });
+ }
+ _each(data, function(task) {
+ var item = {
+ data: task,
+ callback: typeof callback === 'function' ? callback : null
+ };
+
+ if (pos) {
+ q.compact();
+ q.tasks.unshift(item);
+ } else {
+ q.tasks.push(item);
+ }
+
+ if (q.saturated && q.length() === q.concurrency) {
+ q.saturated();
+ }
+ async.setImmediate(q.process);
+ });
+ }
+
+ var workers = 0;
+ var q = {
+ tasks: [],
+ concurrency: concurrency,
+ saturated: null,
+ empty: null,
+ drain: null,
+ started: false,
+ paused: false,
+ index: 0,
+ push: function (data, callback) {
+ _insert(q, data, false, callback);
+ },
+ kill: function () {
+ q.drain = null;
+ q.tasks = [];
+ q.index = 0;
+ },
+ unshift: function (data, callback) {
+ _insert(q, data, true, callback);
+ },
+ process: function () {
+ if (!q.paused && workers < q.concurrency && q.length()) {
+ if (q.index > 500) {
+ q.compact();
+ }
+ var task = q.tasks[q.index++];
+ if (q.empty && q.length() === 0) {
+ q.empty();
+ }
+ workers += 1;
+ var next = function () {
+ workers -= 1;
+ if (task.callback) {
+ task.callback.apply(task, arguments);
+ }
+ if (q.drain && q.length() + workers === 0) {
+ q.drain();
+ }
+ q.process();
+ };
+ var cb = only_once(next);
+ worker(task.data, cb);
+ }
+ },
+ length: function () {
+ return q.tasks.length - q.index;
+ },
+ running: function () {
+ return workers;
+ },
+ idle: function() {
+ return q.length() + workers === 0;
+ },
+ pause: function () {
+ if (q.paused === true) { return; }
+ q.paused = true;
+ },
+ resume: function () {
+ if (q.paused === false) { return; }
+ q.paused = false;
+ // Need to call q.process once per concurrent
+ // worker to preserve full concurrency after pause
+ for (var w = 1; w <= q.concurrency; w++) {
+ async.setImmediate(q.process);
+ }
+ },
+ compact: function () {
+ q.tasks.splice(0, q.index);
+ q.index = 0;
+ }
+ };
+ return q;
+ };
+
async.priorityQueue = function (worker, concurrency) {
function _compareTasks(a, b){
[~/src/async(master)]$ node queue_benchmark.js
Benchmarking 100 entries
upstream async.queue: 2 ms ±2% to process 100 entries
indexed async.queue: 1.9 ms ±1.97% to process 100 entries
--------------------------------------------------
Benchmarking 1000 entries
upstream async.queue: 7.9 ms ±1.43% to process 1000 entries
indexed async.queue: 8.3 ms ±1.81% to process 1000 entries
--------------------------------------------------
Benchmarking 10000 entries
upstream async.queue: 78.4 ms ±1.66% to process 10000 entries
indexed async.queue: 79.2 ms ±4.07% to process 10000 entries
--------------------------------------------------
Benchmarking 100000 entries
upstream async.queue: 864.9 ms ±2.9% to process 100000 entries
indexed async.queue: 843.5 ms ±2.48% to process 100000 entries
--------------------------------------------------
Benchmarking 200000 entries
upstream async.queue: 21157.1 ms ±1.79% to process 200000 entries
indexed async.queue: 1665.4 ms ±2.84% to process 200000 entries
--------------------------------------------------
Benchmarking 300000 entries
upstream async.queue: 45245.9 ms ±0.29% to process 300000 entries
indexed async.queue: 2714.3 ms ±2.31% to process 300000 entries
var async = require('./lib/async');
var Benchmark = require('benchmark');
async.series([
runBench(100),
runBench(1000),
runBench(10000),
runBench(100000),
runBench(200000),
runBench(300000)
]);
function runBench(numEntries) {
return function(callback) {
var suite = new Benchmark.Suite;
function benchQueue(name, queueFn) {
suite.add(name, function(deferred) {
var q = queueFn(worker, 1);
for (var i = 1; i <= numEntries; i++) {
q.push({num: i});
}
function worker(task, callback) {
if (task.num === numEntries) {
deferred.resolve();
}
setImmediate(callback);
}
}, {defer: true, minSamples: 7});
}
benchQueue("upstream async.queue", async.queue);
benchQueue("indexed async.queue", async.indexedQueue);
suite.on('cycle', function(event) {
var target = event.target;
var mean = event.target.stats.mean * 1000;
console.log("%s: %d ms \xb1%d% to process %d entries",
target.name, mean.toFixed(1),
target.stats.rme.toFixed(2), numEntries);
});
suite.on('complete', function() {
console.log('--------------------------------------------------');
callback();
});
console.log('Benchmarking %d entries', numEntries);
suite.run();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment