The new nodejs Streams2 API is great at buffering, handles back-pressure for you in many cases, and exposes a nice read() method that allows reading a fixed length number of bytes. However, if you'd like to read from a stream or pipe a stream into a destination stream until a pattern, you're back to some level of rolling your own buffering and back-pressure support. UntilStream aims to solve both use cases.
Examples below, more details can be found on EvanOxfeld/until-stream.
UntilStream unpipes and ends the destination stream when the pattern is reached. The next call to read() returns exactly the pattern. Back-pressure is handled for you.
var UntilStream = require('until-stream');
var streamBuffers = require("stream-buffers");
var us = new UntilStream({ pattern: 'World'});
var sourceStream = new streamBuffers.ReadableStreamBuffer();
sourceStream.put("Hello World");
var writableStream = new streamBuffers.WritableStreamBuffer();
sourceStream.pipe(us).pipe(writableStream);
writableStream.once('close', function () {
//writeableStream contains all data before the pattern occurs
var str = writableStream.getContentsAsString('utf8'); // 'Hello '
//Now the next call to read() returns the pattern
var data = us.read(); // 'World'
});
Calls to read() return chunks of data until the pattern is reached. The next call to read() returns exactly the pattern.
var UntilStream = require('until-stream');
var streamBuffers = require("stream-buffers");
var us = new UntilStream({ pattern: 'jumps'});
var sourceStream = new streamBuffers.ReadableStreamBuffer({ chunkSize: 8 });
sourceStream.put("The quick brown fox jumps over the lazy dog");
sourceStream.pipe(us);
us.on('readable', function() {
if (us.read() === 'jumps') {
console.log('Pattern reached!');
}
});
Most of the magic is in UntilStream.read() which overrides stream.Readable's read() method. Essentially I call stream.Readable.read() and try to detect the pattern within the current chunk. If there's no pattern, read() returns the chunk but slices off pattern.length - 1 for the next call to read().
- Increase the options and improve the API to handle other use cases in this domain
- An auto-pause option might be useful so that the destination stream's does not end when the first instance of the pattern is detected.
- UntilStream emerged from node-unzip. Some zip files store compressed data of an unknown length followed by a binary signature indicating the end of the compressed data. UntilStream will replace the similar mechanism implemented in node-unzip.
- Split out UntilStream into UntilStream.Readable and UntilStream.PassThrough, echoing the Streams2 API. Also allow for the same options as in the Streams2 API.
- Add support for piping a UntilStream to multiple destination streams.
- More testing, i.e. ensure that 'readable' and 'drain' events are emitted correctly.
@wanderview I created a really naive MatchStream implementation here. For the unzip use case where the stream stops flowing on a match, my _transform function really needs to peek at the source stream before calling read(). Otherwise MatchStream ends up with additional bytes in its _readableState buffer that are wiped out when I reach the pattern and push null.
What do you think?