Skip to content

Instantly share code, notes, and snippets.

@EvanOxfeld
Last active October 13, 2017 17:42
Show Gist options
  • Save EvanOxfeld/5024746 to your computer and use it in GitHub Desktop.
Save EvanOxfeld/5024746 to your computer and use it in GitHub Desktop.
A node.js stream that stops piping when a pattern is reached

The new nodejs Streams2 API is great at buffering, handles back-pressure for you in many cases, and exposes a nice read() method that allows reading a fixed length number of bytes. However, if you'd like to read from a stream or pipe a stream into a destination stream until a pattern, you're back to some level of rolling your own buffering and back-pressure support. UntilStream aims to solve both use cases.

Examples below, more details can be found on EvanOxfeld/until-stream.

Quick Examples

Pipe to a destination stream until the pattern is reached

UntilStream unpipes and ends the destination stream when the pattern is reached. The next call to read() returns exactly the pattern. Back-pressure is handled for you.

var UntilStream = require('until-stream');
var streamBuffers = require("stream-buffers");

var us = new UntilStream({ pattern: 'World'});

var sourceStream = new streamBuffers.ReadableStreamBuffer();
sourceStream.put("Hello World");
var writableStream = new streamBuffers.WritableStreamBuffer();

sourceStream.pipe(us).pipe(writableStream);

writableStream.once('close', function () {
  //writeableStream contains all data before the pattern occurs
  var str = writableStream.getContentsAsString('utf8'); // 'Hello '
  //Now the next call to read() returns the pattern
  var data = us.read(); // 'World'
});

read() returns chunks up to the pattern

Calls to read() return chunks of data until the pattern is reached. The next call to read() returns exactly the pattern.

var UntilStream = require('until-stream');
var streamBuffers = require("stream-buffers");

var us = new UntilStream({ pattern: 'jumps'});

var sourceStream = new streamBuffers.ReadableStreamBuffer({ chunkSize: 8 });
sourceStream.put("The quick brown fox jumps over the lazy dog");

sourceStream.pipe(us);

us.on('readable', function() {
  if (us.read() === 'jumps') {
    console.log('Pattern reached!');
  }
});

How UntilStream Works

Most of the magic is in UntilStream.read() which overrides stream.Readable's read() method. Essentially I call stream.Readable.read() and try to detect the pattern within the current chunk. If there's no pattern, read() returns the chunk but slices off pattern.length - 1 for the next call to read().

Next Steps

  • Increase the options and improve the API to handle other use cases in this domain
    • An auto-pause option might be useful so that the destination stream's does not end when the first instance of the pattern is detected.
  • UntilStream emerged from node-unzip. Some zip files store compressed data of an unknown length followed by a binary signature indicating the end of the compressed data. UntilStream will replace the similar mechanism implemented in node-unzip.
  • Split out UntilStream into UntilStream.Readable and UntilStream.PassThrough, echoing the Streams2 API. Also allow for the same options as in the Streams2 API.
  • Add support for piping a UntilStream to multiple destination streams.
  • More testing, i.e. ensure that 'readable' and 'drain' events are emitted correctly.
@EvanOxfeld
Copy link
Author

The current pipe() implementation writes incoming data to the destination stream until the pattern is reached, unpipes, and ends the destination stream. At this point, the 0th index of the stream's internal buffer is at the first byte of the pattern. In using and discussing the API, there's too much stateful stuff going on here. Rather than unpipe and end the destination stream, it would be better to just unpipe.

@EvanOxfeld
Copy link
Author

Alternatively based on conversations with @wanderview UntilStream could be an implementation of a more generic MatchStream where MatchStream calls a function for each buffer:

function (buf, matched, pattern) {
  if (!match) {
    return this.push(buf);
  }
  this.push(buf);
  return this.push(null); //end the stream
}

Or if you want to implement a split stream where the individual buffers get buffered up until a match is found:

var bufArr = [];
function (buf, matched, pattern) {
  if (!match) {
    return bufArr.push(buf);
  }
  bufArr.push(buf);
  this.emit('match', Buffer.concat(bufArr), matched);
}

@EvanOxfeld
Copy link
Author

@wanderview I created a really naive MatchStream implementation here. For the unzip use case where the stream stops flowing on a match, my _transform function really needs to peek at the source stream before calling read(). Otherwise MatchStream ends up with additional bytes in its _readableState buffer that are wiped out when I reach the pattern and push null.

What do you think?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment