Last active
December 21, 2015 05:09
-
-
Save thlorenz/6254995 to your computer and use it in GitHub Desktop.
Sample of inner and outer streams pattern - condensed as much as possible in order to serve as blueprint to generalize and extract module(s).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var from = require('from') | |
, through = require('through') | |
; | |
var outer = from([10, 20, 30, 40]); | |
var outerStream = through(onouter, onendouter) | |
, outerPending = 0 | |
, outerEnded; | |
function onouter (outerVal) { | |
++outerPending; | |
var innerQueue = []; | |
var inner = from([1 + outerVal, 3 + outerVal, 4 + outerVal, 5 + outerVal]); | |
var innerStream = through(oninner, onendinner) | |
, innerPending = 0 | |
, innerEnded; | |
function oninner (innerVal) { | |
++innerPending; | |
setTimeout(function () { | |
innerQueue.push(innerVal * 2); | |
innerMaybeEnd(); | |
}, 100); | |
} | |
function onendinner () { | |
innerMaybeEnd(true); | |
} | |
function innerMaybeEnd (ended) { | |
if (ended) innerEnded = true; else --innerPending; | |
if (!innerPending && innerEnded) innerStream.queue(null); | |
} | |
return inner | |
.pipe(innerStream) | |
.on('end', function () { | |
outerStream.queue(innerQueue); | |
outerMaybeEnd(); | |
}); | |
} | |
function onendouter () { | |
outerMaybeEnd(true); | |
} | |
function outerMaybeEnd (ended) { | |
if (ended) outerEnded = true; else --outerPending; | |
if (!outerPending && outerEnded) outerStream.queue(null); | |
} | |
outer | |
.pipe(outerStream) | |
.pipe(through(console.log)); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var from = require('from') | |
, through = require('through') | |
; | |
var outer = from([10, 20, 30, 40]); | |
var outerStream = through(onouter, onendouter) | |
, outerPending = 0 | |
, outerEnded; | |
function onouter (outerVal) { | |
++outerPending; | |
var innerQueue = []; | |
var inner = from([1 + outerVal, 3 + outerVal, 4 + outerVal, 5 + outerVal]); | |
var innerStream = through(oninner, onendinner) | |
, innerPending = 0 | |
, innerEnded; | |
function oninner (innerVal) { | |
++innerPending; | |
setTimeout(function () { | |
innerQueue.push(innerVal * 2); | |
innerMaybeEnd(); | |
}, 100); | |
} | |
function onendinner () { | |
innerMaybeEnd(true); | |
} | |
function innerMaybeEnd (ended) { | |
if (ended) innerEnded = true; else --innerPending; | |
if (!innerPending && innerEnded) innerStream.queue(null); | |
} | |
return inner | |
.pipe(innerStream) | |
.on('end', function () { | |
// flatten the streamed values by queuing one by one | |
innerQueue.forEach(function (val) { | |
outerStream.queue(val); | |
}); | |
outerMaybeEnd(); | |
}); | |
} | |
function onendouter () { | |
outerMaybeEnd(true); | |
} | |
function outerMaybeEnd (ended) { | |
if (ended) outerEnded = true; else --outerPending; | |
if (!outerPending && outerEnded) outerStream.queue(null); | |
} | |
outer | |
.pipe(outerStream) | |
.pipe(through(console.log)); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var from = require('from') | |
, through = require('through') | |
, asyncThru = require('async-through') | |
; | |
var outerStream = asyncThru(onouter); | |
function onouter (outerVal) { | |
var innerStream = asyncThru(oninner) | |
function oninner (innerVal) { | |
setTimeout(function () { | |
innerStream.queue(innerVal * 2); | |
}, 20 * outerVal); | |
} | |
from([1 + outerVal, 3 + outerVal, 4 + outerVal, 5 + outerVal]) | |
.pipe(innerStream) | |
.on('data', function (data) { outerStream.queue(data, true); }) | |
.on('end', function () { outerStream.queue(null); } ); | |
} | |
from([10, 20, 30, 40]) | |
.pipe(outerStream) | |
.pipe(through(console.log)); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var from = require('from') | |
, through = require('through') | |
, asyncThru = require('async-through') | |
; | |
var outerStream = through(onouter, onendouter) | |
, outerPending = 0 | |
, outerEnded; | |
function onouter (outerVal) { | |
++outerPending; | |
var innerStream = asyncThru(oninner) | |
function oninner (innerVal) { | |
setTimeout(function () { | |
innerStream.queue(innerVal * 2); | |
}, 20 * outerVal); | |
} | |
from([1 + outerVal, 3 + outerVal, 4 + outerVal, 5 + outerVal]) | |
.pipe(innerStream) | |
.on('data', function (data) { outerStream.queue(data); }) | |
.on('end', outerMaybeEnd); | |
} | |
function onendouter () { | |
outerMaybeEnd(true); | |
} | |
function outerMaybeEnd (ended) { | |
if (ended) outerEnded = true; else --outerPending; | |
if (!outerPending && outerEnded) outerStream.queue(null); | |
} | |
from([10, 20, 30, 40]) | |
.pipe(outerStream) | |
.pipe(through(console.log)); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var from = require('from') | |
, through = require('through') | |
; | |
var result = []; | |
var outer = from([10, 20, 30, 40]); | |
var outerStream = through(onouter, onendouter) | |
, outerPending = 0 | |
, outerEnded; | |
function onouter (outerVal) { | |
++outerPending; | |
var inner = from([1 + outerVal, 3 + outerVal, 4 + outerVal, 5 + outerVal]); | |
var innerStream = through(oninner, onendinner) | |
, innerPending = 0 | |
, innerEnded; | |
function oninner (innerVal) { | |
++innerPending; | |
setTimeout(function () { | |
result.push(innerVal); | |
innerMaybeEnd(); | |
}, 100); | |
} | |
function onendinner () { | |
innerMaybeEnd(true); | |
} | |
function innerMaybeEnd (ended) { | |
if (ended) innerEnded = true; else --innerPending; | |
if (!innerPending && innerEnded) innerStream.queue(null); | |
} | |
inner | |
.pipe(innerStream) | |
.on('end', outerMaybeEnd); | |
} | |
function onendouter () { | |
outerMaybeEnd(true); | |
} | |
function outerMaybeEnd (ended) { | |
if (ended) outerEnded = true; else --outerPending; | |
if (!outerPending && outerEnded) outerStream.queue(null); | |
} | |
outer | |
.pipe(outerStream) | |
.on('end', function () { | |
console.log('result', result); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Seems like two modules could be extracted from this:
onvalue
function makeInner(outerValue) {..}
and returns with final result (i.e. allows doing the above with much less code)Note that the fact that I'm aggregating into closed over
result
vs emitting values should have nothing to do with the implementation of those modules.