Skip to content

Instantly share code, notes, and snippets.

@creationix
Last active December 23, 2015 05:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save creationix/6587060 to your computer and use it in GitHub Desktop.
Save creationix/6587060 to your computer and use it in GitHub Desktop.
module.exports = cat;
var slice = [].slice;
function cat(/* inputs */) {
var inputs = slice.call(arguments);
var current;
var output = { pull: pull, abort: abort };
return output;
function pull() {
if (!current) return shift();
return current.pull();
}
function shift() {
current = inputs.shift();
if (!current) return output.onend();
current.onend = shift;
current.ondata = output.ondata;
current.onerror = output.onerror;
return current.pull();
}
function abort() {
inputs.length = 0;
if (!current) return;
current.abort();
current = null;
}
}
module.exports = function (stream, onItem, callback) {
var sync;
stream.onend = stream.onerror = callback;
stream.ondata = function (item) {
onItem(item);
if (!sync) read();
};
return read();
function read() {
sync = true;
while (sync = stream.pull());
console.log();
}
};
module.exports = function (size) {
// Input is binary chunks.
// Output is fixed-size `size`-byte binary chunks
return function (emit) {
var tmp = null;
var offset = 0;
return function (chunk) {
if (chunk === undefined) {
if (offset) {
emit(tmp.slice(0, offset));
tmp = null;
offset = 0;
}
return emit();
}
var length = chunk.length;
var start = 0;
if (offset) {
var more = Math.min(size - offset, length);
chunk.copy(tmp, offset, 0, more);
offset += more;
if (offset < size) return;
emit(tmp);
tmp = null;
offset = 0;
start += more;
}
while (length - start >= size) {
emit(chunk.slice(start, start + size));
start += size;
}
if (start < length) {
tmp = new Buffer(size);
chunk.copy(tmp, 0, start);
offset = length - start;
}
};
};
};
exports.deframe = deframe;
// Input is binary chunks.
// Output is lines as binary (including the tailing newline).
function deframe(emit) {
var parts = [], length = 0;
return function (chunk) {
if (chunk === undefined) {
flush();
return emit();
}
var start = 0;
var length = chunk.length;
for (var i = 0; i < length; ++i) {
if (chunk[i] !== 0x0a) continue;
push(chunk.slice(start, i + 1));
start = i + 1;
flush();
}
if (start < length) {
push(chunk.slice(start));
}
};
function push(part) {
length += part.length;
parts.push(part);
}
function flush() {
if (!length) return;
var output = Buffer.concat(parts, length);
length = 0;
parts.length = 0;
emit(output);
}
}
module.exports = function (map) {
return function (input) {
var first = true;
var output = { pull: pull, abort: input.abort };
return output;
function pull() {
if (first) {
input.ondata = ondata;
input.onend = output.onend;
input.onerror = output.onend;
first = false;
}
return input.pull();
}
function ondata(item) {
var out;
try {
out = map(item);
}
catch (err) {
return output.onerror(err);
}
output.ondata(out);
}
};
};
module.exports = function (transform) {
return function (input) {
var queue, done = false, waiting = false, write;
var output = { pull: pull, abort: abort };
return output;
function setup() {
input.ondata = input.onend = onitem;
write = transform(push);
input.onerror = output.onerror;
queue = [];
}
function pull() {
if (done) return false;
if (!queue) setup();
read();
if (queue.length) return shift();
waiting = true;
}
function read() {
while (!done && !queue.length && input.pull());
}
function onitem(item) {
if (done) return;
write(item);
if (waiting) {
read();
if (queue.length) return shift();
}
}
function shift() {
waiting = false;
var next = queue.shift();
if (next === undefined) {
done = true;
output.onend();
return false;
}
output.ondata(next);
return true;
}
function push(item) {
if (done) return;
queue.push(item);
}
function abort() {
done = true;
queue = null;
input.abort();
}
};
};
var fs = require('fs');
module.exports = readStream;
var chunkSize = 40 * 1024;
function readStream(path) {
var fd, buffer, done = false;
var output = { pull: pull, abort: abort };
return output;
function pull() {
if (!fd) return fs.open(path, "r", onopen);
fs.read(fd, buffer, 0, chunkSize, null, onread);
}
function abort() {
done = true;
if (fd) {
fs.close(fd);
fd = null;
}
}
function onopen(err, fileDescriptor) {
if (done) {
if (fd) fs.close(fd);
return;
}
if (err) return output.onerror(err);
buffer = new Buffer(chunkSize);
fd = fileDescriptor;
return pull();
}
function onread(err, bytesRead) {
if (done) return;
if (err) return output.onerror(err);
if (bytesRead === 0) {
done = true;
fs.close(fd);
output.onend();
return;
}
var chunk;
if (bytesRead !== chunkSize) {
chunk = buffer.slice(0, bytesRead);
}
else {
chunk = buffer;
}
buffer = new Buffer(chunkSize);
output.ondata(chunk);
}
}
var readStream = require('./read-stream.js');
var pushTransform = require('./push-transform.js');
var cat = require('./cat.js');
var deframe = pushTransform(require('./line-frame.js').deframe);
var consume = require('./consume.js');
var input = deframe(cat(
readStream("cat.js"),
readStream("read-stream.js"),
readStream("push-transform.js"),
readStream("line-frame.js"),
readStream("consume.js")
));
consume(input, console.log, function (err) {
if (err) throw err;
console.log("Done");
});
var readStream = require('./read-stream.js');
var fixed = require('./fixed.js');
var pushTransform = require('./push-transform.js');
var mapTransform = require('./map-transform.js');
var chunk8 = pushTransform(fixed(8));
var parse = mapTransform(parseJoy);
var consume = require('./consume.js');
var input = parse(chunk8(readStream("/dev/input/js0")));
consume(input, console.log, function (err) {
if (err) throw err;
console.log("Done");
});
function parseJoy(buffer) {
var event = {
time: buffer.readUInt32LE(0),
value: buffer.readInt16LE(4),
number: buffer[7],
};
var type = buffer[6];
if (type & 0x80) event.init = true;
if (type & 0x01) event.type = "button";
if (type & 0x02) event.type = "axis";
return event;
}
@dominictarr
Copy link

hmm, I think I am warming to this idea.

cat needs to abort all inputs. the other inputs may be waiting or already buffered or who knows what.

This pattern is more complicated for map-transform than pull/min.

Hmm, how does onend work? you call stream.pull() and then it eventually calls ondata or onend, correct?

I think this pattern may have an advantage when you have to handle save a cb and then do something else,
and then cb. cases like this,
https://github.com/dominictarr/pull-stream/blob/master/throughs.js#L32-L77

And also
https://github.com/dominictarr/pull-stream/blob/master/throughs.js#L258-L289

Which go to some effort to marshall the correct callback to be called next, which is eliminated in
this model (I'm gonna call this min2 streams for now). hmm, but I fear that complexity may just be pushed into the consumer.

Also, I feel a little bit nervous about how min2 streams have transforms, etc,
that assign on to the previous stream, with the setup function, or check for 'first'.
I don't know, this is just a feeling though.

@creationix
Copy link
Author

cat needs to abort all inputs. the other inputs may be waiting or already buffered or who knows what.

My thinking was that I would require in the spec that sources not need any abort notification till after the first .pull call. But now after thinking about it, I don't see any harm is assuming people might have setup state and making cat call abort on all the pending streams.

Hmm, how does onend work? you call stream.pull() and then it eventually calls ondata or onend, correct?

Yes.

I think this pattern may have an advantage when you have to handle save a cb and then do something else...

Yes a million times. This had been a huge pain working with simple-streams in js-git. There are so many cases where I need to store the callback in some global (within the closure) and then retrieve it later when needed. With this new model, the 3 named callbacks are always available. Also I can report errors right away without having to queue them

...but I fear that complexity may just be pushed into the consumer.

So far I don't think so. The only complexity I've pushed onto the consumer is given them a more powerful way to consume the sync or async streams without blowing the stack. It's a fair amount of burden on the middle layers to make sure .pull returns the right value too.

Also, I feel a little bit nervous about how min2 streams have transforms, etc, that assign on to the previous stream, with the setup function, or check for 'first'.

That's no different than node streams attaching event listeners to the stream they are consuming. At least this way there is less danger of leaking since there can only be one active listener at t time. If you want to tee, for example, you'll need a special explicit connector for that.

As you've noticed, I always don't touch the input stream till the first time .pull is called. Also I don't assume my output object has it's listeners attached till that point as well. This will be part of the contract in the spec. All consumers must attach all listeners before calling .pull. And all producers must not do anything with it till the first time pull is called. I guess if we wanted to get really explicit, we could have a .setup function as well as the .pull, but that's probably overkill.

@dominictarr
Copy link

hmm. okay, I have been very reluctant to replace pull/min-streams, but this is looking promising.
hmm, I guess in the case where you need a particular cb to get the ondata, you can reassign it!

So, what about a min2 -> min adapter?

It's definitely important to call abort on all the cat inputs. if those streams had already been passed to an async
transform, or if they were also cat streams, then they need to know if they will never be read from.

function toMin(min2) {
  var _cb
  min2.onData = function (data) { _cb(null, data) }
  min2.onEnd = function () { _cb() } //or _cb(true) for pull-streams
  min2.onError = function (err) { _cb(err) }

  return function (abort, cb) {
    if(abort) min2.abort()
    else {
      _cb = cb
     min2.pull()
   }
  }
}

Hmm, pretty simple!

I assume that a writer is still implemented as a reader(readable)

@dominictarr
Copy link

hmm, what about doing "setup" like this:

min2.pipe = function (dest) {
  dest.setup(min2)
}

@creationix
Copy link
Author

Having .setup separate from "first pull" is cleaner. I just fear we have too much surface area. Is my fear unfounded maybe?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment