Skip to content

Instantly share code, notes, and snippets.

@laverdet
Created March 29, 2011 14:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save laverdet/892430 to your computer and use it in GitHub Desktop.
Save laverdet/892430 to your computer and use it in GitHub Desktop.
Simple future library which makes use of node-fibers for blocking operations
"use strict";
require('fibers');
this.Future = Future;
this.FiberFuture = FiberFuture;
this.wait = wait;
/**
* Return a function which automatically creates a fiber for each invocation.
*/
Function.prototype.future = function() {
var fn = this;
return function() {
return new FiberFuture(fn, this, arguments);
};
}
/**
* A reference which will be resolved to a value some time in the future.
*/
function Future() {}
Future.prototype = {
get: function() {
if (!this.resolved) {
throw new Error('Future must resolve before value is ready');
} else if (this.error) {
throw this.error;
} else {
return this.value;
}
},
didResolve: function(value) {
if (this.resolved) {
throw new Error('Future resolved more than once');
}
this.value = value;
this.resolved = true;
var callbacks = this.callbacks;
if (callbacks) {
delete this.callbacks;
for (var ii = 0; ii < callbacks.length; ++ii) {
callbacks[ii](undefined, value);
}
}
},
didThrow: function(error) {
if (this.resolved) {
throw new Error('Future resolved more than once');
} else if (!error) {
throw new Error('Must throw non-empty error');
}
this.error = error;
this.resolved = true;
var callbacks = this.callbacks;
if (callbacks) {
delete this.callbacks;
for (var ii = 0; ii < callbacks.length; ++ii) {
callbacks[ii](error);
}
}
},
isResolved: function() {
return this.resolved === true;
},
resolve: function(cb) {
if (this.resolved) {
cb(this.error, this.value);
} else {
(this.callbacks = this.callbacks || []).push(cb);
}
},
/**
* Differs from its functional counterpart in that it actually resolves the future. Thus if the
* future threw, future.wait() will throw.
*/
wait: function() {
wait(this);
return this.get();
},
};
/**
* A future implemented as a fiber. Whatever the fiber returns is used as the value. If it throws,
* that is captured as the exception.
*/
function FiberFuture(fn, context, args) {
Future.call(this);
if (!fn) {
throw new Error('Requires function');
}
this.fn = fn;
this.context = context;
this.args = args;
this.started = false;
}
FiberFuture.prototype = Object.create(Future.prototype);
FiberFuture.prototype.resolve = function(cb) {
Future.prototype.resolve.call(this, cb);
if (!this.started) {
this.started = true;
Fiber(function() {
try {
this.didResolve(this.fn.apply(this.context, this.args));
} catch(e) {
this.didThrow(e);
}
}.bind(this)).run();
}
};
/**
* Wait on a number of futures to resolve. This blocks the current fiber, so only use this in the
* context of a fiber <<function(){}.future()>>
*/
function wait(/* ... */) {
var futures = [], singleFiberFuture;
// Sanity check + pull out a FiberFuture for reuse if possible
for (var ii = 0; ii < arguments.length; ++ii) {
if (!singleFiberFuture && arguments[ii] instanceof FiberFuture && !arguments[ii].started) {
// Reuse current fiber for this future
singleFiberFuture = arguments[ii];
continue;
}
if (arguments[ii] instanceof Future) {
futures.push(arguments[ii]);
} else {
throw new Error(arguments[ii] + ' is not a future');
}
}
// Resumes current fiber
var fiber = Fiber.current;
if (!fiber) {
throw new Error('Can\'t wait without a fiber');
}
function resume() {
fiber.run();
}
// Resolve all futures
var pending = arguments.length; // include singleFiberFuture in `pending` count
function cb() {
if (!--pending) {
resume();
}
}
for (var ii = 0; ii < futures.length; ++ii) {
futures[ii].resolve(cb);
}
// Reusing a fiber?
if (singleFiberFuture) {
singleFiberFuture.started = true;
try {
singleFiberFuture.didResolve(
singleFiberFuture.fn.apply(singleFiberFuture.context, singleFiberFuture.args));
} catch(e) {
singleFiberFuture.didThrow(e);
}
--pending;
}
// Yield this fiber
if (pending) {
Fiber.yield();
}
}
Tue, 29 Mar 2011 14:16:08 GMT
Tue, 29 Mar 2011 14:16:09 GMT
Tue, 29 Mar 2011 14:16:10 GMT
"use strict";
done
"use strict";
var future = require('./future'),
Future = future.Future,
wait = future.wait;
var fs = require('fs');
function sleep(ms) {
var future = new Future;
setTimeout(function() {
future.didResolve();
}, ms);
return future;
}
function readFile(file) {
var future = new Future;
fs.readFile(file, 'utf8', function(err, val) {
if (err) {
future.didThrow(err);
} else {
future.didResolve(val);
}
});
return future;
}
var task = function() {
// wait a second..
console.log(new Date);
sleep(1000).wait();
// wait a second..
console.log(new Date);
sleep(1000).wait();
// read a file
console.log(new Date);
console.log(readFile(__filename).wait().split(/\n/)[0]);
}.future();
task().resolve(function(err) {
if (err) {
console.log('there was an error! ' + err);
} else {
console.log('done');
}
});
function task2() {
var future = new Future;
// wait a second..
console.log(new Date);
sleep(1000).resolve(function(err, val) {
if (err) { future.didThrow(err); return; }
// wait a second..
console.log(new Date);
sleep(1000).resolve(function(err, val) {
if (err) { future.didThrow(err); return; }
// read a file
console.log(new Date);
readFile(__filename).resolve(function(err, val) {
if (err) { future.didThrow(err); return; }
console.log(val.split(/\n/)[0]);
future.didResolve();
});
});
});
return future;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment