Skip to content

Instantly share code, notes, and snippets.

@emilbayes
Created March 16, 2015 09:42
Show Gist options
  • Save emilbayes/b1d9c8cf07022e7ebb36 to your computer and use it in GitHub Desktop.
Save emilbayes/b1d9c8cf07022e7ebb36 to your computer and use it in GitHub Desktop.
I need callbacks on _read
var stream = require('stream');
var outer = 0, inner = 0;
var collectorStream = stream.Readable({
highWaterMark: 1,
objectMode: true,
read: function() {
var self = this;
var id1 = outer++;
setTimeout(function() {
var id2 = inner++;
setTimeout(function() {
if(id2 === 10) return self.push(null);
//Concat in here
self.push({outer: id1, inner: id2});
}, 100*Math.random())
}, 50*Math.random())
}
});
var fanoutStream = stream.Transform({
readableObjectMode: true,
writableObjectMode: true,
allowHalfOpen: true,
highWaterMark: 16,
transform: function(itemSet, enc, cb) {
for (var i = 0; i < 10; i++)
this.push(itemSet);
cb();
}
});
collectorStream.pipe(fanoutStream).on('data', console.log.bind(console));
var stream = require('stream');
var outer = 0, inner = 0;
var spaghettiStream = new stream.Readable({
highWaterMark: 1,
objectMode: true,
read: function() {
var self = this;
var id1 = outer++;
setTimeout(function() {
var id2 = inner++;
setTimeout(function() {
for (var i = 0; i < 3; i++)
self.push({hello: true, outer: id1, inner: id2, i: i});
}, 50*Math.random());
}, 100*Math.random());
}
});
spaghettiStream.on('data', console.log.bind(console));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment