Created
September 12, 2017 00:40
-
-
Save IronSavior/2e10422bd34a135091dde2f6dd6eba8f to your computer and use it in GitHub Desktop.
Simple stream demuxer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const {Readable, Writable} = require('stream'); | |
const PRIVATE = Symbol('private state accessor'); | |
// Writable stream that routes each chunk through one of its output streams as determined by a user-supplied fdunction. | |
// | |
// An output stream is dynamically created whenver the channel selector function returns a new channel ID. The channel | |
// IDs may be any value which can function as a Map's key. Listeners to the "channel" event will receive a new Readable | |
// channel output stream when a new channel is created. | |
// | |
// Please note that this stream respects backpressure signaled by its outputs. This means that the overall throughput | |
// rate could suffer if one of the outputs is piped into a slow stream. If any of the outputs are left completely | |
// un-consumed, all throughput will eventually cease from ALL output channels until the chunks buffered by the choked | |
// output channel is consumed. | |
class SimpleDemux extends Writable { | |
// @param get_channel_id {Function} Synchronous function which receives the chunk and returns its channel id | |
constructor( get_channel_id, opts = {} ){ | |
super(opts); | |
const self = this; | |
const channels = new Map(); | |
const priv = this[PRIVATE] = {channels, create_channel, get_channel_id}; | |
// TODO: Remove any streams that are externally ended. | |
this.once('finish', _ => this.channels.forEach(chan => chan.push(null))); | |
return; | |
function create_channel( id ){ | |
if( channels.has(id) ) throw Error(`BUG: A channel with ID "${id}" already exists`); | |
const chan = new Readable(Object.assign({}, opts, {read})); | |
chan.channel_id = id; | |
channels.set(id, chan); | |
self.emit('channel', chan); | |
return chan; | |
function read(){ | |
if( !priv.choke ) return; | |
const [choked_chan, done] = priv.choke; | |
if( this !== choked_chan ) return; | |
delete priv.choke; | |
done(); | |
} | |
} | |
} | |
_write( chunk, _, done ){ | |
const priv = this[PRIVATE]; | |
const { | |
channels, | |
get_channel_id, | |
create_channel | |
} = priv; | |
try{ | |
const id = get_channel_id(chunk); | |
if( channels.has(id) ) return send(channels.get(id)); | |
const chan = create_channel(id); | |
return setImmediate(_ => send(chan)); | |
} | |
catch(e){ | |
return done(e); | |
} | |
function send( chan ){ | |
if( chan.push(chunk) ) return done(); | |
if( priv.choke ) return done(Error("BUG: Choking while choked")); | |
priv.choke = [chan, done]; | |
} | |
} | |
get channels(){ | |
return Array.from(this[PRIVATE].channels.values()); | |
} | |
} | |
module.exports = SimpleDemux; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment