Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save woanversace/c8b5cbec060c4f28b6b62634dad762d2 to your computer and use it in GitHub Desktop.
Save woanversace/c8b5cbec060c4f28b6b62634dad762d2 to your computer and use it in GitHub Desktop.
Node streams - a fractal of weird design

Node streams - a fractal of weird design

and a potential refactor that could fix that

This is a list of issues and confusions I've encountered with node streams, and things I wish were designed and implemented differently. If promise-streams is ever going to change to a design that doesn't build on top of node streams, this would be a list of mistakes to avoid

  1. enc parameter - Why is encoding always passed together with the data? It should be a separate concern. It doesn't even make sense in objectMode
  2. eventemitter base - There is no reason to use an uncomposable grab-bag of stringy events as a base
  3. relying on nextTick etc for execution order - This is very unreliable and causes all sorts of unpredictable rules for implementers which are not documented anywhere.
  4. no error propagation
  5. attaching an error event listener changes behaviour of producer. This effectively couples the consumer with the producer and with all other consumers. The producer should not have to be aware of the listeners. More importantly, the consumer should not affect the consumption of other listeners: if they add an error listener they fundamentally change the way the stream behaves for everyone.
  6. buffering included - Perhaps a buffer should be a separate stream?
  7. objectMode - There should be no difference between object and non-object mode. There are 2 reasons for objectMode: buffering and encoding. If the buffer is a separate stream and encoding cocerns are implemented as separate decoder streams then there will be no difference between objectMode and non-objectMode. Once you move out the buffer, you can do this because you don't have to handle encoding unless you're joining multiple data packets into one.
  8. semantics of errors - What is an error? Should it stop the stream? Should it be possible to continue the stream if an error is handled?
  9. Errors in the target stream cause it to unpipe from the source. The problem is that the target stream will not end. Adding an automatic end event seems like a good idea. This should be possible if multicast is extracted to be a separate stream.
  10. Combining readable and writable. I'm not sure writable streams should be a thing at all, they're a different API. But I'm definitely sure that combining the readable and writable API into a single object sucks. The worst thing it does is that it mixes both read and write error events into a single place which virtually guarantees that you can't forward errors.

Stuff that are good:

  1. backpressure + multiple consumers - I think its rare to get this in a single package. Might be better to have a separate "multicaster" stream that lets you do this, rather than it being allowed all the time. As it is now, it only works if multiple consumers are being pushed the data (they use the data event), but not if they're pulling the data (they use the readable event and read method)

Potential solution

The solution here would be to extract the buffering/encoding logic out into separate transform streams and extract multicast into a separate MulticastStream.

The resulting streams will be pull-only, single producer, single-consumer.

Stream buffers (decoders)

A stream buffer (decoder) will pull data until it fills up to highWatermark. Depending on the type of buffer it will then do different things when pulled from:

  • ByteBuffer will return the entire combined buffer of the data thus far
  • UTF8Decoder will return as much data as possible that is a valid UTF-8 string, will keep the rest in the buffer
  • ObjectBuffer will return a single object of the data thus far.

Then the buffer streams will start pulling again until they're full and so on.

Extracting the buffers will result with all streams working in "objectMode" and eliminate all inconsistencies pertaining to encoding and buffering, which will be decoupled them from the main stream logic.

Multicast stream

To enable multicast piping there will be a separate writable stream called multicast stream. It will be capable of generating multiple Readable streams. Example API:

var multi = readable.pipe(new Multicast());

multi.fork().pipe(target1);
multi.fork().pipe(target2);

Extracting multicast will enable proper error propagation for regular transform streams. Since pipe will only be possible between two streams, the rules can be vastly simplified:

  • An error event propagates forwards, and will unsubscribe (cancel, unpipe) all transform streams backwards.

The complex rules will only remain in MultiCast:

  • It will only pull the next packet from the source when all the forks are done pulling the current packet
  • It will propagate errors forwards to all forks
  • It will only propagate unpipes (unsubscribes) to the source if all forks are unpiped (unsubscribed)

Initial proposed error semantics

  • Errors will propagate through piped streams until an error handling stream is encountered
  • An error handling stream can be created by using mapError, flatMapError or passing another parameter to ps.through which contains an error handler.
  • A stream is "consumed" if its converted to a promise by awaiting its end event, or if its otherwise subscribed to.
  • If an error propagates to a consumed stream, it will
    • reject that stream's promise
    • cancel all subscriptions backwards, such that if a subscription count drops to zero that stream also unsubscribes backwards.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment