Last active
August 29, 2015 14:00
-
-
Save creationix/11155884 to your computer and use it in GitHub Desktop.
Some experimental stream interfaces.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// readable.queue -> array of items to consume | |
// | |
// readable.wait() => more - wait for more data to consume | |
// The => in my docs means "returns" a promise for more in the form of a continuable function, but callback-last style is also accepted. | |
// ES6 function to convert a stream to an array | |
function* read(stream) { | |
while (yield stream.wait()); | |
return stream.queue; | |
} | |
// ES5 version | |
function read(stream, callback) { | |
stream.wait(onReady); | |
function onReady(err, more) { | |
if (err) return callback(err); | |
if (more) return stream.wait(onReady); | |
callback(stream.queue); | |
} | |
} | |
// This is the normal sync transform interface that most protocol parsers are written in. | |
function transform(emit) { | |
// Store state here | |
return function (item) { | |
// Do something, calling emit 0 or more times | |
} | |
} | |
// This takes a readable stream and a sync transform and returns a new stream with the transform applied. | |
function applyTransform(stream, transform) { | |
var more = true; | |
var queue = []; | |
var process = transform(function (item) { | |
if (item === undefined) more = false; | |
else queue.push(item); | |
}); | |
return { | |
queue: queue, | |
wait: wait | |
}; | |
function wait(callback) { | |
stream.wait(onData); | |
function onData(err, isMore) { | |
if (err) return callback(err); | |
stream.queue.splice(0).forEach(process); | |
if (!isMore) process(); | |
callback(more); | |
} | |
} | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// read() -> value, undefined for end of stream | |
// read.readable - read-only flag telling if there is something to read | |
// read.done - read-only flag telling if the stream is finished | |
// read.wait(onReadable) - user set callback to be notified the next time the stream is readable | |
// write() -> true means write more, false means pause | |
// write.flush(onWritable) - user set callback | |
// Reading a stream using ES6 generators | |
function* read(stream) { | |
while (1) { | |
while (stream.readable) { | |
var item = stream.read(); | |
} | |
if (stream.done) break; | |
yield stream.wait; | |
} | |
} | |
// Reading a stream using ES5 callbacks | |
function read(stream, callback) { | |
(function consume() { | |
while (stream.readable) { | |
var item = stream.read(); | |
} | |
if (stream.done) return callback(); | |
stream.wait(consume); | |
}()); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment