Skip to content

Instantly share code, notes, and snippets.

@vqvu

vqvu/merge.js

Last active Aug 29, 2015
Embed
What would you like to do?
merge using iterative pull
Stream.prototype.merge = function () {
var self = this;
var srcs = [];
var srcsNeedPull = [],
first = true,
async = false;
return _(function (push, next) {
var pre_roll;
if (first) {
first = false;
// pre_roll = self._incoming.length;
// self.take(pre_roll).errors(push).map(function (src) {
// srcs.push(src);
// }).resume();
// if (!self.ended) {
// srcs.unshift(self);
// }
// if (srcs.length) {
// srcsNeedPull = srcs;
// pullFromAllSources(push, next);
// } else {
// push(null, nil);
// }
getSourcesSync(push, next);
}
if (srcs.length === 0) {
push(null, nil);
}
else if (srcsNeedPull.length) {
pullFromAllSources(push, next);
}
else {
async = true;
}
});
// Make a handler for the main merge loop.
function srcPullHandler(push, next, src) {
return function (err, x) {
console.log(src.id, x.id ? x.id : x, async);
if (err) {
push(err);
srcsNeedPull.push(src);
}
else if (x === nil) {
srcs = srcs.filter(function (s) {
return s !== src;
});
}
else {
if (src === self) {
srcs.push(x);
srcsNeedPull.push(x);
srcsNeedPull.unshift(self);
} else {
push(null, x);
srcsNeedPull.push(src);
}
}
if (async) {
async = false;
next();
}
};
}
function pullFromAllSources(push, next) {
var _srcs = srcsNeedPull;
console.log(srcsNeedPull.map(function (s) { return s.id; }),
srcs.map(function (s) { return s.id; }));
srcsNeedPull = [];
_srcs.forEach(function (src) {
src.pull(srcPullHandler(push, next, src));
});
next();
}
// Pulls as many sources as possible from self synchronously.
function getSourcesSync(push, next) {
// Shadows the outer async variable.
var async;
var done = false;
while (!done) {
async = true;
self.pull(function (err, x) {
async = false;
if (done) {
// This means the pull was async. Handle like
// regular async.
srcPullHandler(push, next, self)(err, x);
}
else {
if (err) {
push(err);
}
else if (x === nil) {
done = true;
}
else {
srcs.push(x);
srcsNeedPull.push(x);
}
}
});
// Async behavior, record self as a src and return.
if (async) {
done = true;
srcs.unshift(self);
}
}
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment