Skip to content

Instantly share code, notes, and snippets.

@creationix
Last active December 23, 2015 05:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save creationix/6587060 to your computer and use it in GitHub Desktop.
Save creationix/6587060 to your computer and use it in GitHub Desktop.
module.exports = cat;
var slice = [].slice;
function cat(/* inputs */) {
var inputs = slice.call(arguments);
var current;
var output = { pull: pull, abort: abort };
return output;
function pull() {
if (!current) return shift();
return current.pull();
}
function shift() {
current = inputs.shift();
if (!current) return output.onend();
current.onend = shift;
current.ondata = output.ondata;
current.onerror = output.onerror;
return current.pull();
}
function abort() {
inputs.length = 0;
if (!current) return;
current.abort();
current = null;
}
}
module.exports = function (stream, onItem, callback) {
var sync;
stream.onend = stream.onerror = callback;
stream.ondata = function (item) {
onItem(item);
if (!sync) read();
};
return read();
function read() {
sync = true;
while (sync = stream.pull());
console.log();
}
};
module.exports = function (size) {
// Input is binary chunks.
// Output is fixed-size `size`-byte binary chunks
return function (emit) {
var tmp = null;
var offset = 0;
return function (chunk) {
if (chunk === undefined) {
if (offset) {
emit(tmp.slice(0, offset));
tmp = null;
offset = 0;
}
return emit();
}
var length = chunk.length;
var start = 0;
if (offset) {
var more = Math.min(size - offset, length);
chunk.copy(tmp, offset, 0, more);
offset += more;
if (offset < size) return;
emit(tmp);
tmp = null;
offset = 0;
start += more;
}
while (length - start >= size) {
emit(chunk.slice(start, start + size));
start += size;
}
if (start < length) {
tmp = new Buffer(size);
chunk.copy(tmp, 0, start);
offset = length - start;
}
};
};
};
exports.deframe = deframe;
// Input is binary chunks.
// Output is lines as binary (including the tailing newline).
function deframe(emit) {
var parts = [], length = 0;
return function (chunk) {
if (chunk === undefined) {
flush();
return emit();
}
var start = 0;
var length = chunk.length;
for (var i = 0; i < length; ++i) {
if (chunk[i] !== 0x0a) continue;
push(chunk.slice(start, i + 1));
start = i + 1;
flush();
}
if (start < length) {
push(chunk.slice(start));
}
};
function push(part) {
length += part.length;
parts.push(part);
}
function flush() {
if (!length) return;
var output = Buffer.concat(parts, length);
length = 0;
parts.length = 0;
emit(output);
}
}
module.exports = function (map) {
return function (input) {
var first = true;
var output = { pull: pull, abort: input.abort };
return output;
function pull() {
if (first) {
input.ondata = ondata;
input.onend = output.onend;
input.onerror = output.onend;
first = false;
}
return input.pull();
}
function ondata(item) {
var out;
try {
out = map(item);
}
catch (err) {
return output.onerror(err);
}
output.ondata(out);
}
};
};
module.exports = function (transform) {
return function (input) {
var queue, done = false, waiting = false, write;
var output = { pull: pull, abort: abort };
return output;
function setup() {
input.ondata = input.onend = onitem;
write = transform(push);
input.onerror = output.onerror;
queue = [];
}
function pull() {
if (done) return false;
if (!queue) setup();
read();
if (queue.length) return shift();
waiting = true;
}
function read() {
while (!done && !queue.length && input.pull());
}
function onitem(item) {
if (done) return;
write(item);
if (waiting) {
read();
if (queue.length) return shift();
}
}
function shift() {
waiting = false;
var next = queue.shift();
if (next === undefined) {
done = true;
output.onend();
return false;
}
output.ondata(next);
return true;
}
function push(item) {
if (done) return;
queue.push(item);
}
function abort() {
done = true;
queue = null;
input.abort();
}
};
};
var fs = require('fs');
module.exports = readStream;
var chunkSize = 40 * 1024;
function readStream(path) {
var fd, buffer, done = false;
var output = { pull: pull, abort: abort };
return output;
function pull() {
if (!fd) return fs.open(path, "r", onopen);
fs.read(fd, buffer, 0, chunkSize, null, onread);
}
function abort() {
done = true;
if (fd) {
fs.close(fd);
fd = null;
}
}
function onopen(err, fileDescriptor) {
if (done) {
if (fd) fs.close(fd);
return;
}
if (err) return output.onerror(err);
buffer = new Buffer(chunkSize);
fd = fileDescriptor;
return pull();
}
function onread(err, bytesRead) {
if (done) return;
if (err) return output.onerror(err);
if (bytesRead === 0) {
done = true;
fs.close(fd);
output.onend();
return;
}
var chunk;
if (bytesRead !== chunkSize) {
chunk = buffer.slice(0, bytesRead);
}
else {
chunk = buffer;
}
buffer = new Buffer(chunkSize);
output.ondata(chunk);
}
}
var readStream = require('./read-stream.js');
var pushTransform = require('./push-transform.js');
var cat = require('./cat.js');
var deframe = pushTransform(require('./line-frame.js').deframe);
var consume = require('./consume.js');
var input = deframe(cat(
readStream("cat.js"),
readStream("read-stream.js"),
readStream("push-transform.js"),
readStream("line-frame.js"),
readStream("consume.js")
));
consume(input, console.log, function (err) {
if (err) throw err;
console.log("Done");
});
var readStream = require('./read-stream.js');
var fixed = require('./fixed.js');
var pushTransform = require('./push-transform.js');
var mapTransform = require('./map-transform.js');
var chunk8 = pushTransform(fixed(8));
var parse = mapTransform(parseJoy);
var consume = require('./consume.js');
var input = parse(chunk8(readStream("/dev/input/js0")));
consume(input, console.log, function (err) {
if (err) throw err;
console.log("Done");
});
function parseJoy(buffer) {
var event = {
time: buffer.readUInt32LE(0),
value: buffer.readInt16LE(4),
number: buffer[7],
};
var type = buffer[6];
if (type & 0x80) event.init = true;
if (type & 0x01) event.type = "button";
if (type & 0x02) event.type = "axis";
return event;
}
@creationix
Copy link
Author

Having .setup separate from "first pull" is cleaner. I just fear we have too much surface area. Is my fear unfounded maybe?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment