Skip to content

Instantly share code, notes, and snippets.

@EvanOxfeld
Last active October 13, 2017 17:42
Show Gist options
  • Save EvanOxfeld/5024746 to your computer and use it in GitHub Desktop.
Save EvanOxfeld/5024746 to your computer and use it in GitHub Desktop.
A node.js stream that stops piping when a pattern is reached

The new nodejs Streams2 API is great at buffering, handles back-pressure for you in many cases, and exposes a nice read() method that allows reading a fixed length number of bytes. However, if you'd like to read from a stream or pipe a stream into a destination stream until a pattern, you're back to some level of rolling your own buffering and back-pressure support. UntilStream aims to solve both use cases.

Examples below, more details can be found on EvanOxfeld/until-stream.

Quick Examples

Pipe to a destination stream until the pattern is reached

UntilStream unpipes and ends the destination stream when the pattern is reached. The next call to read() returns exactly the pattern. Back-pressure is handled for you.

var UntilStream = require('until-stream');
var streamBuffers = require("stream-buffers");

var us = new UntilStream({ pattern: 'World'});

var sourceStream = new streamBuffers.ReadableStreamBuffer();
sourceStream.put("Hello World");
var writableStream = new streamBuffers.WritableStreamBuffer();

sourceStream.pipe(us).pipe(writableStream);

writableStream.once('close', function () {
  //writeableStream contains all data before the pattern occurs
  var str = writableStream.getContentsAsString('utf8'); // 'Hello '
  //Now the next call to read() returns the pattern
  var data = us.read(); // 'World'
});

read() returns chunks up to the pattern

Calls to read() return chunks of data until the pattern is reached. The next call to read() returns exactly the pattern.

var UntilStream = require('until-stream');
var streamBuffers = require("stream-buffers");

var us = new UntilStream({ pattern: 'jumps'});

var sourceStream = new streamBuffers.ReadableStreamBuffer({ chunkSize: 8 });
sourceStream.put("The quick brown fox jumps over the lazy dog");

sourceStream.pipe(us);

us.on('readable', function() {
  if (us.read() === 'jumps') {
    console.log('Pattern reached!');
  }
});

How UntilStream Works

Most of the magic is in UntilStream.read() which overrides stream.Readable's read() method. Essentially I call stream.Readable.read() and try to detect the pattern within the current chunk. If there's no pattern, read() returns the chunk but slices off pattern.length - 1 for the next call to read().

Next Steps

  • Increase the options and improve the API to handle other use cases in this domain
    • An auto-pause option might be useful so that the destination stream's does not end when the first instance of the pattern is detected.
  • UntilStream emerged from node-unzip. Some zip files store compressed data of an unknown length followed by a binary signature indicating the end of the compressed data. UntilStream will replace the similar mechanism implemented in node-unzip.
  • Split out UntilStream into UntilStream.Readable and UntilStream.PassThrough, echoing the Streams2 API. Also allow for the same options as in the Streams2 API.
  • Add support for piping a UntilStream to multiple destination streams.
  • More testing, i.e. ensure that 'readable' and 'drain' events are emitted correctly.
@EvanOxfeld
Copy link
Author

@wanderview I created a really naive MatchStream implementation here. For the unzip use case where the stream stops flowing on a match, my _transform function really needs to peek at the source stream before calling read(). Otherwise MatchStream ends up with additional bytes in its _readableState buffer that are wiped out when I reach the pattern and push null.

What do you think?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment