Skip to content

Instantly share code, notes, and snippets.

@joyrexus
Last active December 29, 2015 02:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joyrexus/7603860 to your computer and use it in GitHub Desktop.
Save joyrexus/7603860 to your computer and use it in GitHub Desktop.
Quick demo of queue.js

Queue.js is a minimalist async helper library by Mike Bostock.

Here's a quick demonstration of how to use it. We're just going to load two files and sum their corresponding lines. (You might use nested callbacks for this, but that gets hairy if you're doing something along these lines with more than two files.)


Suppose we have two files, A.TXT and B.TXT, each containing a list of numbers, one number per line. Our task is to sum the corresponding lines of each file.

  A  B  SUM

  1  2  3
  2  3  5
  3  4  7
  4  5  9

We're going to read the files in parallel and execute a done function once they're done reading in order to do something with both files, viz., sum their corresponding lines:

queue(2)
  .defer(read, 'A.TXT')
  .defer(read, 'B.TXT')
  .awaitAll(done)

demo.coffee

Alright, let's walk through demo.coffee.

Load our requirements:

fs = require 'fs'
queue = require 'queue'
{deepEqual} = require 'assert'

Our zip method for zipping arrays. The args being passed should be the arrays to be zipped together.

zip = (args...) ->
  sizes = (a.length for a in args)
  min = Math.min(sizes...)
  (arr[i] for arr in args) for i in [0...min]

Our read method for reading in a file:

read = (name, done) -> 
  callback = (err, data) -> 
    result = (parseInt(d) for d in data.split('\n') when d)
    done(err, result)
  fs.readFile name, 'utf8', callback

Notice how read takes two arguments: the name of file to read and a done callback passed by queue. We can pass to done whatever result we want here. Since we're using awaitAll, the result gets added to the set of results colleted from each deferred function.

Now for our concluding callback, which will get executed after each deferred function is done. It gets passed the accumulated results collected from each deferred function.

done = (err, results) -> 
  sums = ((pair.reduce (a, b) -> a + b) for pair in (zip results...))
  expected = [3, 5, 7, 9]
  deepEqual sums, expected

Finally ...

queue(2)
  .defer(read, 'A.TXT')
  .defer(read, 'B.TXT')
  .awaitAll(done)

See also

Mike's ...

Analyzing queue.js

fs = require 'fs'
queue = require 'queue'
zip = (args...) ->
sizes = (a.length for a in args)
min = Math.min(sizes...)
(arr[i] for arr in args) for i in [0...min]
read = (name, done) ->
callback = (err, data) ->
result = (parseInt(d) for d in data.split('\n') when d)
done(err, result)
fs.readFile name, 'utf8', callback
done = (err, results) ->
for pair in (zip results...)
console.log pair.reduce (a, b) -> a + b
queue(2)
.defer(read, 'A.TXT')
.defer(read, 'B.TXT')
.awaitAll(done)
(function() {
if (typeof module === "undefined") self.queue = queue;
else module.exports = queue;
queue.version = "1.0.4";
var slice = [].slice;
function queue(parallelism) {
var q,
tasks = [],
started = 0, // number of tasks that have been started (and perhaps finished)
active = 0, // number of tasks currently being executed (started but not finished)
remaining = 0, // number of tasks not yet finished
popping, // inside a synchronous task callback?
error = null,
await = noop,
all;
if (!parallelism) parallelism = Infinity;
function pop() {
while (popping = started < tasks.length && active < parallelism) {
var i = started++,
t = tasks[i],
a = slice.call(t, 1);
a.push(callback(i));
++active;
t[0].apply(null, a);
}
}
function callback(i) {
return function(e, r) {
--active;
if (error != null) return;
if (e != null) {
error = e; // ignore new tasks and squelch active callbacks
started = remaining = NaN; // stop queued tasks from starting
notify();
} else {
tasks[i] = r;
if (--remaining) popping || pop();
else notify();
}
};
}
function notify() {
if (error != null) await(error);
else if (all) await(error, tasks);
else await.apply(null, [error].concat(tasks));
}
return q = {
defer: function() {
if (!error) {
tasks.push(arguments);
++remaining;
pop();
}
return q;
},
await: function(f) {
await = f;
all = false;
if (!remaining) notify();
return q;
},
awaitAll: function(f) {
await = f;
all = true;
if (!remaining) notify();
return q;
}
};
}
function noop() {}
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment