Skip to content

Instantly share code, notes, and snippets.

@Kuirak
Last active August 29, 2015 14:01
Show Gist options
  • Save Kuirak/5ded9c36be65bd3c9b51 to your computer and use it in GitHub Desktop.
Save Kuirak/5ded9c36be65bd3c9b51 to your computer and use it in GitHub Desktop.
'use strict';
/**
* Created by Jonas Kugelmann on 09.05.2014.
*/
var stream = require('stream')
,util =require('util');
var input1 = ['one','two','three','four','five'];
var input2 = ['ONE','TWO','THREE','FOUR','FIVE'];
util.inherits(InputStream,stream.Readable);
function InputStream(input){
stream.Readable.call(this,{objectMode:true});
this.input =input || [];
}
InputStream.prototype._read =function(){
if(this.input.length <=0){
return;
}
this.push(this.input.pop());
};
var inputStream1 = new InputStream(input1);
var inputStream2 = new InputStream(input2);
util.inherits(ConsoleStream,stream.Writable);
function ConsoleStream(){
stream.Writable.call(this,{objectMode:true});
}
ConsoleStream.prototype._write = function(chunk,enc,next){
console.log(chunk);
next();
};
var consoleStream = new ConsoleStream();
util.inherits(ConcatProcessor,stream.Transform);
function ConcatProcessor(){
stream.Transform.call(this,{objectMode:true});
}
ConcatProcessor.prototype._transform = function(chunk,enc,next){
var result ='';
chunk.forEach(function(item){
result += item;
});
this.push(result);
next();
};
var concatProcessor = new ConcatProcessor();
function CombineStreams(){
var data=[];
var args = Array.prototype.slice.call(arguments);
this.target =function(target){
args.forEach(function(stream){
//TODO check if readable stream
stream.on('data',function(chunk){
stream.pause();
data.push(chunk);
if(data.length ===args.length){
target.write(data);
data =[];
args.forEach(function(stream){
stream.resume();
})
}
})
});
return target;
}
}
new CombineStreams(inputStream1,inputStream2).target(concatProcessor).pipe(consoleStream);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment