Skip to content

Instantly share code, notes, and snippets.

@apparentlymart
Last active October 19, 2016 04:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save apparentlymart/c63df5bba1eefe0960b06857ea54595b to your computer and use it in GitHub Desktop.
Save apparentlymart/c63df5bba1eefe0960b06857ea54595b to your computer and use it in GitHub Desktop.
Highland Diverge Stream, pull edition
var _ = require('highland');
var items = [
'a',
'b',
'c',
'd',
'e',
'f',
'g',
'h',
'i',
'j',
'k',
'l',
'm',
'n',
];
// Any arbitrary stream of stuff
var inStream = _(items);
var divergedStreams = inStream.through(diverge(4));
// Here we can do other things to the streams by mapping over them and
// applying the other highland stream operators.
// For this example, we'll just create a fake bottleneck, but we should
// be able to do any highland-ish thing here.
var resultStreams = divergedStreams.map(function (stream) {
return stream.batchWithTimeOrCount(100, 5).through(slowStream());
});
// Now we merge it all back together again.
var results = resultStreams.merge();
// ...and the caller can do what it wants with the results
results.each(function (item) {
console.log(item);
});
function diverge(n) {
return function (inStream) {
var requests = _();
var closed = false;
var remainingOut = n;
function outputEnded() {
closed = true;
remainingOut--;
if (remainingOut === 0) {
// Close the requests stream once we've closed out
// all of our output streams, for good measure.
requests.write(_.nil);
}
}
requests.flatMap(function (request) {
var push = request[0];
var next = request[1];
if (closed) {
outputEnded();
push(null, _.nil);
return _([]);
}
var ret = _();
inStream.pull(function (err, x) {
if (x === _.nil) {
outputEnded();
push(null, x);
return;
}
push(err, x);
next();
ret.write(_.nil);
});
return ret;
}).done(function () {});
var streams = [];
for (var i = 0; i < n; i++) {
streams.push(_(generator));
}
return _(streams);
function generator(push, next) {
requests.write([push, next]);
}
};
}
function slowStream() {
return _.flatMap(function (x) {
return _(new Promise(function (resolve) {
var jitter = (Math.random() * 1000) - 500;
setTimeout(function () {
resolve({
value: x,
});
}, 1000 + jitter);
}));
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment