Skip to content

Instantly share code, notes, and snippets.

@dominictarr
Created July 15, 2012 14:32
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dominictarr/3117184 to your computer and use it in GitHub Desktop.
Save dominictarr/3117184 to your computer and use it in GitHub Desktop.
Stream tweaks proposal

Stream tweaks proposal

The stream is a powerful tool, not just for IO, -- but, in the best Unix tradition -- for composition of modules. I've been connecting streams into quite long chains.

Some streams I have written do not alter the data in the stream, but affect it is some way, such as buffering when the stream is paused. pause-stream

Also, I've been writing high level abstractions that communicate via streams, and do some interesting high level stuff, Such as by replicating data, snob and crdt, or multiplexing mux-demux. Another good example is dnode.

These abstractions expose a stream on their interface, and so can be used over any io channel which supports pipe.

I am actively encouraging other abstraction authors to adopt this pattern.

Typically, I'll have these abstractions simply write a stream of regular js objects, and then pipe through a streams that (de)serialize the objects. This simplifies writing the abstractions because the (de)serializers are reusable. These streams pass the back-pressure straight through from dest back to source. These are traditionally called "filter-streams" but I think "through-stream" is a but more clear. Node core even has through/filter streams: in zlib

tcpSock
  .pipe(decoder)
  .pipe(abstractionStream)
  .pipe(encoder)
  .pipe(tcpSock)

####Summary

  • io streams
  • middleware streams (e.g. to manage flow control)
  • duplex streams that interface abstractions
  • through/filter streams.

Observations

There are a few small problems with the current pipe implementation.

write() === false & back-pressure

Since back-pressure is only propagated on write()===false back-pressure gets delayed on long streams.

Readable -> Through1 -> Through2 -> Writable

If Through{1,2} return write()===false when paused, when Writable pauses it will take 3 chunks emitted from Readable for the back-pressure propagation to get to Readable, as the first chunk will pause Through2, the next will pause Through1 and the third will finally pause Readable

My suggestion is to reinstate the 'pause' event, which would call pause() on the source, which may then choose to emit 'pause'. The stream's implementation would be responsible for emitting 'pause', not pipe as in v0.4. We don't need to bring 'resume' back, as 'drain' already calls resume().

Automatic Buffering.

in irc isaacs mentioned he was considering a read() method like is used in dart

At first I was against it, because I figured you could just use something like pause-stream. And also, dart-style buffering with stream.read() ruins piping to multiple destinations, which can be quite useful.

however, now that I've thought about it more, I think it would be okay, as long pipe supported both patterns. it would only add a few lines, it would be useful in some situations. I'd want to keep the ('data', chunk) event, because it's useful in other situations.

Don't listen on dest.on('end', cleanup)

https://github.com/joyent/node/blob/master/lib/stream.js#L109

This breaks a duplex stream which may be half duplex, and happens to have emitted 'end' but is still writable.

'error' should put chain into a terminal state.

A stream should always arrive at a terminal state if it stops streaming.

'close' calls dest.destroy(), but if an error occurs, the streaming just stops. and the rest of the stream will be left waiting for another write(), until it is (hopefully) garbage collected.

I propose that 'error' calls destroy like 'close' does.

Unpipe & make 'error' optional

pass cleanup to the dest when emitting 'pipe'

https://github.com/joyent/node/blob/master/lib/stream.js#L112

and make the 'error' listener optional, like 'end'

  dest.emit('pipe', source, cleanup);

you could use it like this:

stream.on('pipe', source, cleanup) {
  stream.unpipe = cleanup
}).on('error', function () {
  stream.unpipe && stream.unpipe()
  stream.destroy() //this is now the users responsibility
})

//pipe to a service that *may* fail
readable.pipe(stream, {error: false})
//pipe to something more reliable...
readable.pipe(stream2)

Passing the option {error: false} would prevent https://github.com/joyent/node/blob/master/lib/stream.js#L86

but still set a listener for source.on('error',...

It's much more useful to be able to pipe a source to multiple sinks, than it is to pipe many streams into one, so it should be dest that controls unpiping.

This is a 2 line change to the current Stream#pipe

Consistent terminal state.

Currently, when source emits 'close' it will call dest.destroy(). Something like this needs to happen in both directions or else long streams can be left un gc'd. Also, 'close' must always be emitted.

When a stream terminates it needs to propagate that message in both directions. Currently, if a stream in the middle is destroyed, it's source may not be gc'd (unless the source of the whole chain is destroyed)

Now, it may be possible to dispose of the source of each pipe pair, as soon as the dest has become unwritable. Another option is to wait until both readable and writable sides of dest are finished and dest emits 'close'.

If we where to destroy the head of the chain as soon as a stream becomes unwritable, it would be necessary to emit an 'unwritable' event or some such, and then call a method on the source. Quite complicated. I think waiting for the dest to emit close is much simpler, and will nearly always not be a long time away.

If you really wanted to clear the head, you could do something like the previous example, but to cleanup and then call source.destroy()

Conclusion

I think Stream is already nearly right, and already very useful. And also, because it's not constrained by a static type system, node streams are way more flexible than streams in dart

But with a few small tweaks, they can be made even more powerful.

@mhart
Copy link

mhart commented Jul 16, 2012

This looks awesome - have struggled with some stream inconsistencies (edit: string encoding for example), so I'd love to see some of this stuff get into 0.9 development - although admittedly I've only had a cursory glance at this.

In any case, a consensus on Stream behaviour that delivers performance and flexibility is always a win in my book.

@sstur
Copy link

sstur commented Jul 17, 2012

Wow. That's very comprehensive and well thought-out. I hope some serious thought goes into some of these issues.

@mikeal
Copy link

mikeal commented Jul 26, 2012

i don't think we should do unpipe() on error we should do it on destory() and the pipe logic should take an error from the input and destroy(error) on itself.

@dominictarr
Copy link
Author

@mikeal I am just suggesting that is is possible to unpipe. I do not think that unpiping should occur by default.

@dominictarr
Copy link
Author

@mikeal also, I agree that we should definately should do destroy(), after both 'error' and 'close'. every part of the pipeline needs to know when the pipeline has terminated, the chain is only as strong as it's weakest link.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment