Skip to content

Instantly share code, notes, and snippets.

@creationix
Last active December 23, 2015 05:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save creationix/6587060 to your computer and use it in GitHub Desktop.
Save creationix/6587060 to your computer and use it in GitHub Desktop.
module.exports = cat;
var slice = [].slice;
function cat(/* inputs */) {
var inputs = slice.call(arguments);
var current;
var output = { pull: pull, abort: abort };
return output;
function pull() {
if (!current) return shift();
return current.pull();
}
function shift() {
current = inputs.shift();
if (!current) return output.onend();
current.onend = shift;
current.ondata = output.ondata;
current.onerror = output.onerror;
return current.pull();
}
function abort() {
inputs.length = 0;
if (!current) return;
current.abort();
current = null;
}
}
module.exports = function (stream, onItem, callback) {
var sync;
stream.onend = stream.onerror = callback;
stream.ondata = function (item) {
onItem(item);
if (!sync) read();
};
return read();
function read() {
sync = true;
while (sync = stream.pull());
console.log();
}
};
module.exports = function (size) {
// Input is binary chunks.
// Output is fixed-size `size`-byte binary chunks
return function (emit) {
var tmp = null;
var offset = 0;
return function (chunk) {
if (chunk === undefined) {
if (offset) {
emit(tmp.slice(0, offset));
tmp = null;
offset = 0;
}
return emit();
}
var length = chunk.length;
var start = 0;
if (offset) {
var more = Math.min(size - offset, length);
chunk.copy(tmp, offset, 0, more);
offset += more;
if (offset < size) return;
emit(tmp);
tmp = null;
offset = 0;
start += more;
}
while (length - start >= size) {
emit(chunk.slice(start, start + size));
start += size;
}
if (start < length) {
tmp = new Buffer(size);
chunk.copy(tmp, 0, start);
offset = length - start;
}
};
};
};
exports.deframe = deframe;
// Input is binary chunks.
// Output is lines as binary (including the tailing newline).
function deframe(emit) {
var parts = [], length = 0;
return function (chunk) {
if (chunk === undefined) {
flush();
return emit();
}
var start = 0;
var length = chunk.length;
for (var i = 0; i < length; ++i) {
if (chunk[i] !== 0x0a) continue;
push(chunk.slice(start, i + 1));
start = i + 1;
flush();
}
if (start < length) {
push(chunk.slice(start));
}
};
function push(part) {
length += part.length;
parts.push(part);
}
function flush() {
if (!length) return;
var output = Buffer.concat(parts, length);
length = 0;
parts.length = 0;
emit(output);
}
}
module.exports = function (map) {
return function (input) {
var first = true;
var output = { pull: pull, abort: input.abort };
return output;
function pull() {
if (first) {
input.ondata = ondata;
input.onend = output.onend;
input.onerror = output.onend;
first = false;
}
return input.pull();
}
function ondata(item) {
var out;
try {
out = map(item);
}
catch (err) {
return output.onerror(err);
}
output.ondata(out);
}
};
};
module.exports = function (transform) {
return function (input) {
var queue, done = false, waiting = false, write;
var output = { pull: pull, abort: abort };
return output;
function setup() {
input.ondata = input.onend = onitem;
write = transform(push);
input.onerror = output.onerror;
queue = [];
}
function pull() {
if (done) return false;
if (!queue) setup();
read();
if (queue.length) return shift();
waiting = true;
}
function read() {
while (!done && !queue.length && input.pull());
}
function onitem(item) {
if (done) return;
write(item);
if (waiting) {
read();
if (queue.length) return shift();
}
}
function shift() {
waiting = false;
var next = queue.shift();
if (next === undefined) {
done = true;
output.onend();
return false;
}
output.ondata(next);
return true;
}
function push(item) {
if (done) return;
queue.push(item);
}
function abort() {
done = true;
queue = null;
input.abort();
}
};
};
var fs = require('fs');
module.exports = readStream;
var chunkSize = 40 * 1024;
function readStream(path) {
var fd, buffer, done = false;
var output = { pull: pull, abort: abort };
return output;
function pull() {
if (!fd) return fs.open(path, "r", onopen);
fs.read(fd, buffer, 0, chunkSize, null, onread);
}
function abort() {
done = true;
if (fd) {
fs.close(fd);
fd = null;
}
}
function onopen(err, fileDescriptor) {
if (done) {
if (fd) fs.close(fd);
return;
}
if (err) return output.onerror(err);
buffer = new Buffer(chunkSize);
fd = fileDescriptor;
return pull();
}
function onread(err, bytesRead) {
if (done) return;
if (err) return output.onerror(err);
if (bytesRead === 0) {
done = true;
fs.close(fd);
output.onend();
return;
}
var chunk;
if (bytesRead !== chunkSize) {
chunk = buffer.slice(0, bytesRead);
}
else {
chunk = buffer;
}
buffer = new Buffer(chunkSize);
output.ondata(chunk);
}
}
var readStream = require('./read-stream.js');
var pushTransform = require('./push-transform.js');
var cat = require('./cat.js');
var deframe = pushTransform(require('./line-frame.js').deframe);
var consume = require('./consume.js');
var input = deframe(cat(
readStream("cat.js"),
readStream("read-stream.js"),
readStream("push-transform.js"),
readStream("line-frame.js"),
readStream("consume.js")
));
consume(input, console.log, function (err) {
if (err) throw err;
console.log("Done");
});
var readStream = require('./read-stream.js');
var fixed = require('./fixed.js');
var pushTransform = require('./push-transform.js');
var mapTransform = require('./map-transform.js');
var chunk8 = pushTransform(fixed(8));
var parse = mapTransform(parseJoy);
var consume = require('./consume.js');
var input = parse(chunk8(readStream("/dev/input/js0")));
consume(input, console.log, function (err) {
if (err) throw err;
console.log("Done");
});
function parseJoy(buffer) {
var event = {
time: buffer.readUInt32LE(0),
value: buffer.readInt16LE(4),
number: buffer[7],
};
var type = buffer[6];
if (type & 0x80) event.init = true;
if (type & 0x01) event.type = "button";
if (type & 0x02) event.type = "axis";
return event;
}
@dominictarr
Copy link

hmm. okay, I have been very reluctant to replace pull/min-streams, but this is looking promising.
hmm, I guess in the case where you need a particular cb to get the ondata, you can reassign it!

So, what about a min2 -> min adapter?

It's definitely important to call abort on all the cat inputs. if those streams had already been passed to an async
transform, or if they were also cat streams, then they need to know if they will never be read from.

function toMin(min2) {
  var _cb
  min2.onData = function (data) { _cb(null, data) }
  min2.onEnd = function () { _cb() } //or _cb(true) for pull-streams
  min2.onError = function (err) { _cb(err) }

  return function (abort, cb) {
    if(abort) min2.abort()
    else {
      _cb = cb
     min2.pull()
   }
  }
}

Hmm, pretty simple!

I assume that a writer is still implemented as a reader(readable)

@dominictarr
Copy link

hmm, what about doing "setup" like this:

min2.pipe = function (dest) {
  dest.setup(min2)
}

@creationix
Copy link
Author

Having .setup separate from "first pull" is cleaner. I just fear we have too much surface area. Is my fear unfounded maybe?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment