Last active
December 23, 2015 05:29
-
-
Save creationix/6587060 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module.exports = cat; | |
var slice = [].slice; | |
function cat(/* inputs */) { | |
var inputs = slice.call(arguments); | |
var current; | |
var output = { pull: pull, abort: abort }; | |
return output; | |
function pull() { | |
if (!current) return shift(); | |
return current.pull(); | |
} | |
function shift() { | |
current = inputs.shift(); | |
if (!current) return output.onend(); | |
current.onend = shift; | |
current.ondata = output.ondata; | |
current.onerror = output.onerror; | |
return current.pull(); | |
} | |
function abort() { | |
inputs.length = 0; | |
if (!current) return; | |
current.abort(); | |
current = null; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module.exports = function (stream, onItem, callback) { | |
var sync; | |
stream.onend = stream.onerror = callback; | |
stream.ondata = function (item) { | |
onItem(item); | |
if (!sync) read(); | |
}; | |
return read(); | |
function read() { | |
sync = true; | |
while (sync = stream.pull()); | |
console.log(); | |
} | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module.exports = function (size) { | |
// Input is binary chunks. | |
// Output is fixed-size `size`-byte binary chunks | |
return function (emit) { | |
var tmp = null; | |
var offset = 0; | |
return function (chunk) { | |
if (chunk === undefined) { | |
if (offset) { | |
emit(tmp.slice(0, offset)); | |
tmp = null; | |
offset = 0; | |
} | |
return emit(); | |
} | |
var length = chunk.length; | |
var start = 0; | |
if (offset) { | |
var more = Math.min(size - offset, length); | |
chunk.copy(tmp, offset, 0, more); | |
offset += more; | |
if (offset < size) return; | |
emit(tmp); | |
tmp = null; | |
offset = 0; | |
start += more; | |
} | |
while (length - start >= size) { | |
emit(chunk.slice(start, start + size)); | |
start += size; | |
} | |
if (start < length) { | |
tmp = new Buffer(size); | |
chunk.copy(tmp, 0, start); | |
offset = length - start; | |
} | |
}; | |
}; | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
exports.deframe = deframe; | |
// Input is binary chunks. | |
// Output is lines as binary (including the tailing newline). | |
function deframe(emit) { | |
var parts = [], length = 0; | |
return function (chunk) { | |
if (chunk === undefined) { | |
flush(); | |
return emit(); | |
} | |
var start = 0; | |
var length = chunk.length; | |
for (var i = 0; i < length; ++i) { | |
if (chunk[i] !== 0x0a) continue; | |
push(chunk.slice(start, i + 1)); | |
start = i + 1; | |
flush(); | |
} | |
if (start < length) { | |
push(chunk.slice(start)); | |
} | |
}; | |
function push(part) { | |
length += part.length; | |
parts.push(part); | |
} | |
function flush() { | |
if (!length) return; | |
var output = Buffer.concat(parts, length); | |
length = 0; | |
parts.length = 0; | |
emit(output); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module.exports = function (map) { | |
return function (input) { | |
var first = true; | |
var output = { pull: pull, abort: input.abort }; | |
return output; | |
function pull() { | |
if (first) { | |
input.ondata = ondata; | |
input.onend = output.onend; | |
input.onerror = output.onend; | |
first = false; | |
} | |
return input.pull(); | |
} | |
function ondata(item) { | |
var out; | |
try { | |
out = map(item); | |
} | |
catch (err) { | |
return output.onerror(err); | |
} | |
output.ondata(out); | |
} | |
}; | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module.exports = function (transform) { | |
return function (input) { | |
var queue, done = false, waiting = false, write; | |
var output = { pull: pull, abort: abort }; | |
return output; | |
function setup() { | |
input.ondata = input.onend = onitem; | |
write = transform(push); | |
input.onerror = output.onerror; | |
queue = []; | |
} | |
function pull() { | |
if (done) return false; | |
if (!queue) setup(); | |
read(); | |
if (queue.length) return shift(); | |
waiting = true; | |
} | |
function read() { | |
while (!done && !queue.length && input.pull()); | |
} | |
function onitem(item) { | |
if (done) return; | |
write(item); | |
if (waiting) { | |
read(); | |
if (queue.length) return shift(); | |
} | |
} | |
function shift() { | |
waiting = false; | |
var next = queue.shift(); | |
if (next === undefined) { | |
done = true; | |
output.onend(); | |
return false; | |
} | |
output.ondata(next); | |
return true; | |
} | |
function push(item) { | |
if (done) return; | |
queue.push(item); | |
} | |
function abort() { | |
done = true; | |
queue = null; | |
input.abort(); | |
} | |
}; | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var fs = require('fs'); | |
module.exports = readStream; | |
var chunkSize = 40 * 1024; | |
function readStream(path) { | |
var fd, buffer, done = false; | |
var output = { pull: pull, abort: abort }; | |
return output; | |
function pull() { | |
if (!fd) return fs.open(path, "r", onopen); | |
fs.read(fd, buffer, 0, chunkSize, null, onread); | |
} | |
function abort() { | |
done = true; | |
if (fd) { | |
fs.close(fd); | |
fd = null; | |
} | |
} | |
function onopen(err, fileDescriptor) { | |
if (done) { | |
if (fd) fs.close(fd); | |
return; | |
} | |
if (err) return output.onerror(err); | |
buffer = new Buffer(chunkSize); | |
fd = fileDescriptor; | |
return pull(); | |
} | |
function onread(err, bytesRead) { | |
if (done) return; | |
if (err) return output.onerror(err); | |
if (bytesRead === 0) { | |
done = true; | |
fs.close(fd); | |
output.onend(); | |
return; | |
} | |
var chunk; | |
if (bytesRead !== chunkSize) { | |
chunk = buffer.slice(0, bytesRead); | |
} | |
else { | |
chunk = buffer; | |
} | |
buffer = new Buffer(chunkSize); | |
output.ondata(chunk); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var readStream = require('./read-stream.js'); | |
var pushTransform = require('./push-transform.js'); | |
var cat = require('./cat.js'); | |
var deframe = pushTransform(require('./line-frame.js').deframe); | |
var consume = require('./consume.js'); | |
var input = deframe(cat( | |
readStream("cat.js"), | |
readStream("read-stream.js"), | |
readStream("push-transform.js"), | |
readStream("line-frame.js"), | |
readStream("consume.js") | |
)); | |
consume(input, console.log, function (err) { | |
if (err) throw err; | |
console.log("Done"); | |
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var readStream = require('./read-stream.js'); | |
var fixed = require('./fixed.js'); | |
var pushTransform = require('./push-transform.js'); | |
var mapTransform = require('./map-transform.js'); | |
var chunk8 = pushTransform(fixed(8)); | |
var parse = mapTransform(parseJoy); | |
var consume = require('./consume.js'); | |
var input = parse(chunk8(readStream("/dev/input/js0"))); | |
consume(input, console.log, function (err) { | |
if (err) throw err; | |
console.log("Done"); | |
}); | |
function parseJoy(buffer) { | |
var event = { | |
time: buffer.readUInt32LE(0), | |
value: buffer.readInt16LE(4), | |
number: buffer[7], | |
}; | |
var type = buffer[6]; | |
if (type & 0x80) event.init = true; | |
if (type & 0x01) event.type = "button"; | |
if (type & 0x02) event.type = "axis"; | |
return event; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Having
.setup
separate from "first pull" is cleaner. I just fear we have too much surface area. Is my fear unfounded maybe?