Skip to content

Instantly share code, notes, and snippets.

@creationix
Last active December 15, 2015 16:49
Show Gist options
  • Save creationix/5291866 to your computer and use it in GitHub Desktop.
Save creationix/5291866 to your computer and use it in GitHub Desktop.
await wrapper using node-fibers
var Fiber = require("fibers");
module.exports = await;
function await(continuation) {
var fiber = Fiber.current;
var result;
var async;
continuation(function (err, value) {
if (async === undefined) {
async = false;
result = value;
if (err) { throw err; }
return;
}
if (err) fiber.throwInto(err);
else fiber.run(value);
});
if (async === undefined) {
async = true;
return Fiber.yield();
}
return result;
}
var Queue = require('./queue');
module.exports = Pipe;
function Pipe() {
this.paused = false;
this.processing = false;
this.inputQueue = new Queue;
this.readerQueue = new Queue;
this.resumeList = [];
}
Pipe.prototype.highWaterMark = 1;
Pipe.prototype.lowWaterMark = 1;
Pipe.prototype.processReaders = function () {
// This function is not re-entrant. Keep out recursive calls with a semaphore.
if (this.processing) { return; }
this.processing = true;
// Let's play matchmaker and pair data with readers
while (this.inputQueue.length && this.readerQueue.length) {
var chunk = this.inputQueue.shift();
var reader = this.readerQueue.shift();
reader(null, chunk);
}
// Flow control logic for high-water/low-water and pause/resume.
var depth = this.inputQueue.length - this.readerQueue.length;
if (!this.paused && depth >= this.highWaterMark) {
// If there is too much data and not enough readers,
// tell the writer to pause.
this.paused = true;
}
else if (this.paused && depth <= this.lowWaterMark) {
// If we're paused and there is room for more data,
// tell the writer to resume
this.paused = false;
// and flush any pending write callbacks.
for (var i = 0, l = this.resumeList.length; i < l; i++) {
process.nextTick(this.resumeList[i]);
}
this.resumeList.length = 0;
}
// We're done here, allow this function to be called again.
this.processing = false;
};
Pipe.prototype.read = function () {
var self = this;
return function (callback) {
self.readerQueue.push(callback);
self.processReaders();
};
};
Pipe.prototype.write = function (chunk) {
var self = this;
return function (callback) {
self.inputQueue.push(chunk);
self.processReaders();
if (!callback) { return; }
if (self.paused) {
self.resumeList.push(callback);
return;
}
callback();
}
};
module.exports = Queue;
function Queue() {
this.head = [];
this.tail = [];
this.index = 0;
this.headLength = 0;
this.length = 0;
}
// Get an item from the front of the queue.
Queue.prototype.shift = function () {
if (this.index >= this.headLength) {
// When the head is empty, swap it with the tail to get fresh items.
var t = this.head;
t.length = 0;
this.head = this.tail;
this.tail = t;
this.index = 0;
this.headLength = this.head.length;
if (!this.headLength) {
return;
}
}
// There was an item in the head, let's pull it out.
var value = this.head[this.index];
// And remove it from the head
if (this.index < 0) {
delete this.head[this.index++];
}
else {
this.head[this.index++] = undefined;
}
this.length--;
return value;
};
// Insert a new item at the front of the queue.
Queue.prototype.unshift = function (item) {
this.head[--this.index] = item;
this.length++;
return this;
};
// Push a new item on the end of the queue.
Queue.prototype.push = function (item) {
// Pushes always go to the write-only tail
this.length++;
this.tail.push(item);
return this;
};
/*
var q = new Queue;
q.push(1);
q.push(2);
q.push(3);
var i = 4;
while (q.length > 0) {
console.log(q.length, q.shift());
q.unshift(i++);
console.log(q.length, q.shift());
q.push(i++);
console.log(q.length, q.shift());
}
*/
var await = require('./await');
var Fiber = require('fibers');
var Pipe = require('./pipe');
// A simple awaitable sleep function
function sleep(ms) { return function (callback) {
setTimeout(callback, ms);
}}
// Writes to one end can be read out the other end
// but with internal buffering and flow-control.
var p = new Pipe();
// Override the watermarks to buffer more.
p.highWaterMark = 3;
p.lowWaterMark = 3;
// Implement a producer fiber.
Fiber(function () {
var i = 0;
while (true) {
await(sleep(Math.random() * 800));
console.log("Writing", i);
await(p.write(i++));
}
}).run();
// Implement a slightly slower consumer fiber.
// (slower to test the flow-control inside Pipe)
Fiber(function () {
do {
await(sleep(Math.random() * 1000));
var chunk = await(p.read());
console.log("Read", chunk);
} while (chunk !== undefined);
}).run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment