Last active
December 23, 2015 05:29
-
-
Save creationix/6587060 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module.exports = cat; | |
var slice = [].slice; | |
function cat(/* inputs */) { | |
var inputs = slice.call(arguments); | |
var current; | |
var output = { pull: pull, abort: abort }; | |
return output; | |
function pull() { | |
if (!current) return shift(); | |
return current.pull(); | |
} | |
function shift() { | |
current = inputs.shift(); | |
if (!current) return output.onend(); | |
current.onend = shift; | |
current.ondata = output.ondata; | |
current.onerror = output.onerror; | |
return current.pull(); | |
} | |
function abort() { | |
inputs.length = 0; | |
if (!current) return; | |
current.abort(); | |
current = null; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module.exports = function (stream, onItem, callback) { | |
var sync; | |
stream.onend = stream.onerror = callback; | |
stream.ondata = function (item) { | |
onItem(item); | |
if (!sync) read(); | |
}; | |
return read(); | |
function read() { | |
sync = true; | |
while (sync = stream.pull()); | |
console.log(); | |
} | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module.exports = function (size) { | |
// Input is binary chunks. | |
// Output is fixed-size `size`-byte binary chunks | |
return function (emit) { | |
var tmp = null; | |
var offset = 0; | |
return function (chunk) { | |
if (chunk === undefined) { | |
if (offset) { | |
emit(tmp.slice(0, offset)); | |
tmp = null; | |
offset = 0; | |
} | |
return emit(); | |
} | |
var length = chunk.length; | |
var start = 0; | |
if (offset) { | |
var more = Math.min(size - offset, length); | |
chunk.copy(tmp, offset, 0, more); | |
offset += more; | |
if (offset < size) return; | |
emit(tmp); | |
tmp = null; | |
offset = 0; | |
start += more; | |
} | |
while (length - start >= size) { | |
emit(chunk.slice(start, start + size)); | |
start += size; | |
} | |
if (start < length) { | |
tmp = new Buffer(size); | |
chunk.copy(tmp, 0, start); | |
offset = length - start; | |
} | |
}; | |
}; | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
exports.deframe = deframe; | |
// Input is binary chunks. | |
// Output is lines as binary (including the tailing newline). | |
function deframe(emit) { | |
var parts = [], length = 0; | |
return function (chunk) { | |
if (chunk === undefined) { | |
flush(); | |
return emit(); | |
} | |
var start = 0; | |
var length = chunk.length; | |
for (var i = 0; i < length; ++i) { | |
if (chunk[i] !== 0x0a) continue; | |
push(chunk.slice(start, i + 1)); | |
start = i + 1; | |
flush(); | |
} | |
if (start < length) { | |
push(chunk.slice(start)); | |
} | |
}; | |
function push(part) { | |
length += part.length; | |
parts.push(part); | |
} | |
function flush() { | |
if (!length) return; | |
var output = Buffer.concat(parts, length); | |
length = 0; | |
parts.length = 0; | |
emit(output); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module.exports = function (map) { | |
return function (input) { | |
var first = true; | |
var output = { pull: pull, abort: input.abort }; | |
return output; | |
function pull() { | |
if (first) { | |
input.ondata = ondata; | |
input.onend = output.onend; | |
input.onerror = output.onend; | |
first = false; | |
} | |
return input.pull(); | |
} | |
function ondata(item) { | |
var out; | |
try { | |
out = map(item); | |
} | |
catch (err) { | |
return output.onerror(err); | |
} | |
output.ondata(out); | |
} | |
}; | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module.exports = function (transform) { | |
return function (input) { | |
var queue, done = false, waiting = false, write; | |
var output = { pull: pull, abort: abort }; | |
return output; | |
function setup() { | |
input.ondata = input.onend = onitem; | |
write = transform(push); | |
input.onerror = output.onerror; | |
queue = []; | |
} | |
function pull() { | |
if (done) return false; | |
if (!queue) setup(); | |
read(); | |
if (queue.length) return shift(); | |
waiting = true; | |
} | |
function read() { | |
while (!done && !queue.length && input.pull()); | |
} | |
function onitem(item) { | |
if (done) return; | |
write(item); | |
if (waiting) { | |
read(); | |
if (queue.length) return shift(); | |
} | |
} | |
function shift() { | |
waiting = false; | |
var next = queue.shift(); | |
if (next === undefined) { | |
done = true; | |
output.onend(); | |
return false; | |
} | |
output.ondata(next); | |
return true; | |
} | |
function push(item) { | |
if (done) return; | |
queue.push(item); | |
} | |
function abort() { | |
done = true; | |
queue = null; | |
input.abort(); | |
} | |
}; | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var fs = require('fs'); | |
module.exports = readStream; | |
var chunkSize = 40 * 1024; | |
function readStream(path) { | |
var fd, buffer, done = false; | |
var output = { pull: pull, abort: abort }; | |
return output; | |
function pull() { | |
if (!fd) return fs.open(path, "r", onopen); | |
fs.read(fd, buffer, 0, chunkSize, null, onread); | |
} | |
function abort() { | |
done = true; | |
if (fd) { | |
fs.close(fd); | |
fd = null; | |
} | |
} | |
function onopen(err, fileDescriptor) { | |
if (done) { | |
if (fd) fs.close(fd); | |
return; | |
} | |
if (err) return output.onerror(err); | |
buffer = new Buffer(chunkSize); | |
fd = fileDescriptor; | |
return pull(); | |
} | |
function onread(err, bytesRead) { | |
if (done) return; | |
if (err) return output.onerror(err); | |
if (bytesRead === 0) { | |
done = true; | |
fs.close(fd); | |
output.onend(); | |
return; | |
} | |
var chunk; | |
if (bytesRead !== chunkSize) { | |
chunk = buffer.slice(0, bytesRead); | |
} | |
else { | |
chunk = buffer; | |
} | |
buffer = new Buffer(chunkSize); | |
output.ondata(chunk); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var readStream = require('./read-stream.js'); | |
var pushTransform = require('./push-transform.js'); | |
var cat = require('./cat.js'); | |
var deframe = pushTransform(require('./line-frame.js').deframe); | |
var consume = require('./consume.js'); | |
var input = deframe(cat( | |
readStream("cat.js"), | |
readStream("read-stream.js"), | |
readStream("push-transform.js"), | |
readStream("line-frame.js"), | |
readStream("consume.js") | |
)); | |
consume(input, console.log, function (err) { | |
if (err) throw err; | |
console.log("Done"); | |
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var readStream = require('./read-stream.js'); | |
var fixed = require('./fixed.js'); | |
var pushTransform = require('./push-transform.js'); | |
var mapTransform = require('./map-transform.js'); | |
var chunk8 = pushTransform(fixed(8)); | |
var parse = mapTransform(parseJoy); | |
var consume = require('./consume.js'); | |
var input = parse(chunk8(readStream("/dev/input/js0"))); | |
consume(input, console.log, function (err) { | |
if (err) throw err; | |
console.log("Done"); | |
}); | |
function parseJoy(buffer) { | |
var event = { | |
time: buffer.readUInt32LE(0), | |
value: buffer.readInt16LE(4), | |
number: buffer[7], | |
}; | |
var type = buffer[6]; | |
if (type & 0x80) event.init = true; | |
if (type & 0x01) event.type = "button"; | |
if (type & 0x02) event.type = "axis"; | |
return event; | |
} |
hmm. okay, I have been very reluctant to replace pull/min-streams, but this is looking promising.
hmm, I guess in the case where you need a particular cb to get the ondata, you can reassign it!
So, what about a min2 -> min adapter?
It's definitely important to call abort on all the cat inputs. if those streams had already been passed to an async
transform, or if they were also cat streams, then they need to know if they will never be read from.
function toMin(min2) {
var _cb
min2.onData = function (data) { _cb(null, data) }
min2.onEnd = function () { _cb() } //or _cb(true) for pull-streams
min2.onError = function (err) { _cb(err) }
return function (abort, cb) {
if(abort) min2.abort()
else {
_cb = cb
min2.pull()
}
}
}
Hmm, pretty simple!
I assume that a writer is still implemented as a reader(readable)
hmm, what about doing "setup" like this:
min2.pipe = function (dest) {
dest.setup(min2)
}
Having .setup
separate from "first pull" is cleaner. I just fear we have too much surface area. Is my fear unfounded maybe?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
My thinking was that I would require in the spec that sources not need any abort notification till after the first
.pull
call. But now after thinking about it, I don't see any harm is assuming people might have setup state and making cat call abort on all the pending streams.Yes.
Yes a million times. This had been a huge pain working with simple-streams in js-git. There are so many cases where I need to store the callback in some global (within the closure) and then retrieve it later when needed. With this new model, the 3 named callbacks are always available. Also I can report errors right away without having to queue them
So far I don't think so. The only complexity I've pushed onto the consumer is given them a more powerful way to consume the sync or async streams without blowing the stack. It's a fair amount of burden on the middle layers to make sure
.pull
returns the right value too.That's no different than node streams attaching event listeners to the stream they are consuming. At least this way there is less danger of leaking since there can only be one active listener at t time. If you want to tee, for example, you'll need a special explicit connector for that.
As you've noticed, I always don't touch the input stream till the first time
.pull
is called. Also I don't assume my output object has it's listeners attached till that point as well. This will be part of the contract in the spec. All consumers must attach all listeners before calling.pull
. And all producers must not do anything with it till the first time pull is called. I guess if we wanted to get really explicit, we could have a.setup
function as well as the.pull
, but that's probably overkill.