Skip to content

Instantly share code, notes, and snippets.

@groundwater
Last active December 23, 2015 02:29
Show Gist options
  • Save groundwater/6567600 to your computer and use it in GitHub Desktop.
Save groundwater/6567600 to your computer and use it in GitHub Desktop.
var Readable = require('stream').Readable;
var Writable = require('stream').Writable;
var Transform = require('stream').Transform;
var util = require('util');
util.inherits(NewLine, Transform);
function NewLine(){
Transform.call(this,{
objectMode: true
});
}
NewLine.prototype._transform = function(chunk, encoding, next){
this.push(chunk + '\n');
next();
}
//--------------------
//----- Track
//--------------------
// tracks should not be generated manually,
// they are created by Switch objects
util.inherits(_Track, Transform);
function _Track(){
Transform.call(this,{
objectMode: true
});
}
_Track.prototype._transform = function(chunk, encoding, next){
// immediately pass the object along
this.push(chunk);
next();
}
//--------------------
//----- Switch
//--------------------
util.inherits(Switch, Transform);
function Switch(){
Transform.call(this,{
objectMode: true
});
this._matchers = [];
}
Switch.prototype._transform = function(obj, encoding, next){
var matches = this._matchers;
// search all cases for a match
// matching is done on a first-to-match basis
for(i in matches){
var match = matches[i];
var cls = match.cls;
if( obj instanceof cls ) {
// pass object to matching track
return match.track.write(obj,encoding,next);
}
}
// no match found
this.push(obj);
next();
}
// return a readable Track object
// e.g. switch.case(X).pipe(Y)
Switch.prototype.case = function(cls){
var track = new _Track();
this._matchers.push({
cls : cls,
track : track
});
return track;
}
//--------------------
//----- Log
//--------------------
util.inherits(Json, Transform);
function Json(){
Transform.call(this,{
objectMode: true
});
}
Json.prototype._transform = function(chunk, encoding, next){
this.push( JSON.stringify(chunk) );
next();
}
util.inherits(Log, Writable);
function Log(prefix){
this.prefix = prefix;
Writable.call(this,{
objectMode: true
});
}
Log.prototype._write = function(chunk,encoding,next){
console.log("%s", this.prefix, chunk);
next();
}
util.inherits(Fountain, Readable);
function Fountain(list){
Readable.call(this,{
objectMode: true
});
this.list = list;
this.i = 0;
}
Fountain.prototype._read = function(){
var self = this;
var Constructor = this.list[this.i];
this.i = (this.i + 1) % this.list.length;
setTimeout( function(){
self.push( new Constructor() );
}, 1000);
}
//--------------------
//----- Example
//--------------------
function One(){
this.msg = "One";
}
function Two(){
this.msg = "Two";
}
function Three(){
this.msg = "Three";
}
var sw = new Switch();
// only objects of type `One` will pass here
sw.case(One).pipe( new Json() ).pipe( new NewLine() ).pipe( process.stdout );
// only object of type `Two` will pass here
sw.case(Two).pipe( new Json() ).pipe( new NewLine() ).pipe( process.stderr );
new Fountain(
[One,Two,Three] // emit instances of these object continuously
).pipe(
sw // switch objects on cases defined above
).pipe(
new Log("Dropped") // this is the equivalent of the `default` case
);

Switch Stream

The switch stream multiplexes an readable stream across a series of writable streams. All streams operate in object mode, meaning read and write events emit whole objects instead of buffers.

Usage

var sw = new Switch();

sw.case(Car).pipe( car_handler );
sw.case(Van).pipe( van_handler );

vehicles.pipe( sw ).pipe( misc_handler );

The Switch stream is readable and writable. The switch stream only emits events that are not captured by any of the case classes defined by switch.case( class ).

                                              +-------------+
                                     +------> |   stream 1  |
+----------+         +----------+    |        +-------------+
| generate |         |          |    |
|  objects | ------> |  switch  | ---+------> +-------------+
+----------+         |          |    |        |   stream 2  |
                     +----------+    |        +-------------+
                                     |
                                     +------> +-------------+
                                              |   stream 3  |
                                              +-------------+
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment