Skip to content

Instantly share code, notes, and snippets.

@thlorenz
Last active December 21, 2015 05:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thlorenz/6254995 to your computer and use it in GitHub Desktop.
Save thlorenz/6254995 to your computer and use it in GitHub Desktop.
Sample of inner and outer streams pattern - condensed as much as possible in order to serve as blueprint to generalize and extract module(s).
var from = require('from')
, through = require('through')
;
var outer = from([10, 20, 30, 40]);
var outerStream = through(onouter, onendouter)
, outerPending = 0
, outerEnded;
function onouter (outerVal) {
++outerPending;
var innerQueue = [];
var inner = from([1 + outerVal, 3 + outerVal, 4 + outerVal, 5 + outerVal]);
var innerStream = through(oninner, onendinner)
, innerPending = 0
, innerEnded;
function oninner (innerVal) {
++innerPending;
setTimeout(function () {
innerQueue.push(innerVal * 2);
innerMaybeEnd();
}, 100);
}
function onendinner () {
innerMaybeEnd(true);
}
function innerMaybeEnd (ended) {
if (ended) innerEnded = true; else --innerPending;
if (!innerPending && innerEnded) innerStream.queue(null);
}
return inner
.pipe(innerStream)
.on('end', function () {
outerStream.queue(innerQueue);
outerMaybeEnd();
});
}
function onendouter () {
outerMaybeEnd(true);
}
function outerMaybeEnd (ended) {
if (ended) outerEnded = true; else --outerPending;
if (!outerPending && outerEnded) outerStream.queue(null);
}
outer
.pipe(outerStream)
.pipe(through(console.log));
var from = require('from')
, through = require('through')
;
var outer = from([10, 20, 30, 40]);
var outerStream = through(onouter, onendouter)
, outerPending = 0
, outerEnded;
function onouter (outerVal) {
++outerPending;
var innerQueue = [];
var inner = from([1 + outerVal, 3 + outerVal, 4 + outerVal, 5 + outerVal]);
var innerStream = through(oninner, onendinner)
, innerPending = 0
, innerEnded;
function oninner (innerVal) {
++innerPending;
setTimeout(function () {
innerQueue.push(innerVal * 2);
innerMaybeEnd();
}, 100);
}
function onendinner () {
innerMaybeEnd(true);
}
function innerMaybeEnd (ended) {
if (ended) innerEnded = true; else --innerPending;
if (!innerPending && innerEnded) innerStream.queue(null);
}
return inner
.pipe(innerStream)
.on('end', function () {
// flatten the streamed values by queuing one by one
innerQueue.forEach(function (val) {
outerStream.queue(val);
});
outerMaybeEnd();
});
}
function onendouter () {
outerMaybeEnd(true);
}
function outerMaybeEnd (ended) {
if (ended) outerEnded = true; else --outerPending;
if (!outerPending && outerEnded) outerStream.queue(null);
}
outer
.pipe(outerStream)
.pipe(through(console.log));
var from = require('from')
, through = require('through')
, asyncThru = require('async-through')
;
var outerStream = asyncThru(onouter);
function onouter (outerVal) {
var innerStream = asyncThru(oninner)
function oninner (innerVal) {
setTimeout(function () {
innerStream.queue(innerVal * 2);
}, 20 * outerVal);
}
from([1 + outerVal, 3 + outerVal, 4 + outerVal, 5 + outerVal])
.pipe(innerStream)
.on('data', function (data) { outerStream.queue(data, true); })
.on('end', function () { outerStream.queue(null); } );
}
from([10, 20, 30, 40])
.pipe(outerStream)
.pipe(through(console.log));
var from = require('from')
, through = require('through')
, asyncThru = require('async-through')
;
var outerStream = through(onouter, onendouter)
, outerPending = 0
, outerEnded;
function onouter (outerVal) {
++outerPending;
var innerStream = asyncThru(oninner)
function oninner (innerVal) {
setTimeout(function () {
innerStream.queue(innerVal * 2);
}, 20 * outerVal);
}
from([1 + outerVal, 3 + outerVal, 4 + outerVal, 5 + outerVal])
.pipe(innerStream)
.on('data', function (data) { outerStream.queue(data); })
.on('end', outerMaybeEnd);
}
function onendouter () {
outerMaybeEnd(true);
}
function outerMaybeEnd (ended) {
if (ended) outerEnded = true; else --outerPending;
if (!outerPending && outerEnded) outerStream.queue(null);
}
from([10, 20, 30, 40])
.pipe(outerStream)
.pipe(through(console.log));
var from = require('from')
, through = require('through')
;
var result = [];
var outer = from([10, 20, 30, 40]);
var outerStream = through(onouter, onendouter)
, outerPending = 0
, outerEnded;
function onouter (outerVal) {
++outerPending;
var inner = from([1 + outerVal, 3 + outerVal, 4 + outerVal, 5 + outerVal]);
var innerStream = through(oninner, onendinner)
, innerPending = 0
, innerEnded;
function oninner (innerVal) {
++innerPending;
setTimeout(function () {
result.push(innerVal);
innerMaybeEnd();
}, 100);
}
function onendinner () {
innerMaybeEnd(true);
}
function innerMaybeEnd (ended) {
if (ended) innerEnded = true; else --innerPending;
if (!innerPending && innerEnded) innerStream.queue(null);
}
inner
.pipe(innerStream)
.on('end', outerMaybeEnd);
}
function onendouter () {
outerMaybeEnd(true);
}
function outerMaybeEnd (ended) {
if (ended) outerEnded = true; else --outerPending;
if (!outerPending && outerEnded) outerStream.queue(null);
}
outer
.pipe(outerStream)
.on('end', function () {
console.log('result', result);
});
@thlorenz
Copy link
Author

Seems like two modules could be extracted from this:

  • a counting version of through to allow async ops inside onvalue
  • a nested-streams module that allows passing the outer stream and function makeInner(outerValue) {..} and returns with final result (i.e. allows doing the above with much less code)

Note that the fact that I'm aggregating into closed over result vs emitting values should have nothing to do with the implementation of those modules.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment