Skip to content

Instantly share code, notes, and snippets.

@creationix
Last active August 29, 2015 14:00
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save creationix/11155884 to your computer and use it in GitHub Desktop.
Save creationix/11155884 to your computer and use it in GitHub Desktop.
Some experimental stream interfaces.
// readable.queue -> array of items to consume
//
// readable.wait() => more - wait for more data to consume
// The => in my docs means "returns" a promise for more in the form of a continuable function, but callback-last style is also accepted.
// ES6 function to convert a stream to an array
function* read(stream) {
while (yield stream.wait());
return stream.queue;
}
// ES5 version
function read(stream, callback) {
stream.wait(onReady);
function onReady(err, more) {
if (err) return callback(err);
if (more) return stream.wait(onReady);
callback(stream.queue);
}
}
// This is the normal sync transform interface that most protocol parsers are written in.
function transform(emit) {
// Store state here
return function (item) {
// Do something, calling emit 0 or more times
}
}
// This takes a readable stream and a sync transform and returns a new stream with the transform applied.
function applyTransform(stream, transform) {
var more = true;
var queue = [];
var process = transform(function (item) {
if (item === undefined) more = false;
else queue.push(item);
});
return {
queue: queue,
wait: wait
};
function wait(callback) {
stream.wait(onData);
function onData(err, isMore) {
if (err) return callback(err);
stream.queue.splice(0).forEach(process);
if (!isMore) process();
callback(more);
}
}
}
// read() -> value, undefined for end of stream
// read.readable - read-only flag telling if there is something to read
// read.done - read-only flag telling if the stream is finished
// read.wait(onReadable) - user set callback to be notified the next time the stream is readable
// write() -> true means write more, false means pause
// write.flush(onWritable) - user set callback
// Reading a stream using ES6 generators
function* read(stream) {
while (1) {
while (stream.readable) {
var item = stream.read();
}
if (stream.done) break;
yield stream.wait;
}
}
// Reading a stream using ES5 callbacks
function read(stream, callback) {
(function consume() {
while (stream.readable) {
var item = stream.read();
}
if (stream.done) return callback();
stream.wait(consume);
}());
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment