Skip to content

Instantly share code, notes, and snippets.

@IronSavior
Created September 12, 2017 00:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save IronSavior/2e10422bd34a135091dde2f6dd6eba8f to your computer and use it in GitHub Desktop.
Save IronSavior/2e10422bd34a135091dde2f6dd6eba8f to your computer and use it in GitHub Desktop.
Simple stream demuxer
const {Readable, Writable} = require('stream');
const PRIVATE = Symbol('private state accessor');
// Writable stream that routes each chunk through one of its output streams as determined by a user-supplied fdunction.
//
// An output stream is dynamically created whenver the channel selector function returns a new channel ID. The channel
// IDs may be any value which can function as a Map's key. Listeners to the "channel" event will receive a new Readable
// channel output stream when a new channel is created.
//
// Please note that this stream respects backpressure signaled by its outputs. This means that the overall throughput
// rate could suffer if one of the outputs is piped into a slow stream. If any of the outputs are left completely
// un-consumed, all throughput will eventually cease from ALL output channels until the chunks buffered by the choked
// output channel is consumed.
class SimpleDemux extends Writable {
// @param get_channel_id {Function} Synchronous function which receives the chunk and returns its channel id
constructor( get_channel_id, opts = {} ){
super(opts);
const self = this;
const channels = new Map();
const priv = this[PRIVATE] = {channels, create_channel, get_channel_id};
// TODO: Remove any streams that are externally ended.
this.once('finish', _ => this.channels.forEach(chan => chan.push(null)));
return;
function create_channel( id ){
if( channels.has(id) ) throw Error(`BUG: A channel with ID "${id}" already exists`);
const chan = new Readable(Object.assign({}, opts, {read}));
chan.channel_id = id;
channels.set(id, chan);
self.emit('channel', chan);
return chan;
function read(){
if( !priv.choke ) return;
const [choked_chan, done] = priv.choke;
if( this !== choked_chan ) return;
delete priv.choke;
done();
}
}
}
_write( chunk, _, done ){
const priv = this[PRIVATE];
const {
channels,
get_channel_id,
create_channel
} = priv;
try{
const id = get_channel_id(chunk);
if( channels.has(id) ) return send(channels.get(id));
const chan = create_channel(id);
return setImmediate(_ => send(chan));
}
catch(e){
return done(e);
}
function send( chan ){
if( chan.push(chunk) ) return done();
if( priv.choke ) return done(Error("BUG: Choking while choked"));
priv.choke = [chan, done];
}
}
get channels(){
return Array.from(this[PRIVATE].channels.values());
}
}
module.exports = SimpleDemux;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment