The new nodejs Streams2 API is great at buffering, handles back-pressure for you in many cases, and exposes a nice read() method that allows reading a fixed length number of bytes. However, if you'd like to read from a stream or pipe a stream into a destination stream until a pattern, you're back to some level of rolling your own buffering and back-pressure support. UntilStream aims to solve both use cases.
Examples below, more details can be found on EvanOxfeld/until-stream.
UntilStream unpipes and ends the destination stream when the pattern is reached. The next call to read() returns exactly the pattern. Back-pressure is handled for you.
var UntilStream = require('until-stream');
var streamBuffers = require("stream-buffers");
var us = new UntilStream({ pattern: 'World'});
var sourceStream = new streamBuffers.ReadableStreamBuffer();
sourceStream.put("Hello World");
var writableStream = new streamBuffers.WritableStreamBuffer();
sourceStream.pipe(us).pipe(writableStream);
writableStream.once('close', function () {
//writeableStream contains all data before the pattern occurs
var str = writableStream.getContentsAsString('utf8'); // 'Hello '
//Now the next call to read() returns the pattern
var data = us.read(); // 'World'
});
Calls to read() return chunks of data until the pattern is reached. The next call to read() returns exactly the pattern.
var UntilStream = require('until-stream');
var streamBuffers = require("stream-buffers");
var us = new UntilStream({ pattern: 'jumps'});
var sourceStream = new streamBuffers.ReadableStreamBuffer({ chunkSize: 8 });
sourceStream.put("The quick brown fox jumps over the lazy dog");
sourceStream.pipe(us);
us.on('readable', function() {
if (us.read() === 'jumps') {
console.log('Pattern reached!');
}
});
Most of the magic is in UntilStream.read() which overrides stream.Readable's read() method. Essentially I call stream.Readable.read() and try to detect the pattern within the current chunk. If there's no pattern, read() returns the chunk but slices off pattern.length - 1 for the next call to read().
- Increase the options and improve the API to handle other use cases in this domain
- An auto-pause option might be useful so that the destination stream's does not end when the first instance of the pattern is detected.
- UntilStream emerged from node-unzip. Some zip files store compressed data of an unknown length followed by a binary signature indicating the end of the compressed data. UntilStream will replace the similar mechanism implemented in node-unzip.
- Split out UntilStream into UntilStream.Readable and UntilStream.PassThrough, echoing the Streams2 API. Also allow for the same options as in the Streams2 API.
- Add support for piping a UntilStream to multiple destination streams.
- More testing, i.e. ensure that 'readable' and 'drain' events are emitted correctly.
The current pipe() implementation writes incoming data to the destination stream until the pattern is reached, unpipes, and ends the destination stream. At this point, the 0th index of the stream's internal buffer is at the first byte of the pattern. In using and discussing the API, there's too much stateful stuff going on here. Rather than unpipe and end the destination stream, it would be better to just unpipe.