Skip to content

Instantly share code, notes, and snippets.

@idx3d
Created June 4, 2013 09:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save idx3d/5704685 to your computer and use it in GitHub Desktop.
Save idx3d/5704685 to your computer and use it in GitHub Desktop.
"use strict";
const GeneratorFunction = function*(){}.constructor;
const GeneratorFunctionPrototype = GeneratorFunction.prototype;
const slice = Array.prototype.slice;
const concat = Array.prototype.concat;
/**
* A user callback function that is raised when the result of an async function
* is available.
*
* @param Any error
* If truthy indicates an error result. Will be thrown from the
* generator.
* @param Any result
* If the callbaxk is successful, will be the yielded value.
*/
function Callback(error, result){}
/**
* The type of function that must be yielded from a generator being run.
*
* @param Callback callback
* The function which will handle the error or result of the
* continuation.
*/
function Continuation(callback){}
/**
* A GeneratorFunction where every value yielded must either itself be a
* Generator or a Continuation.
*
* @return Generator
*/
function Continuable(/*...args*/){}
/**
* A no-operation function.
*/
const noop = function(){};
/**
* A function that returns the value given to it.
*
* @param Any value
* The value to return
* @return Any
*/
const identity = function(value){
return value;
};
/**
* Wrap a function so it is only called the first time the wrapper is called.
*
* @param Function fn
* The function to wrap.
* @return Function
* The wrapper.
*/
const once = function(fn){
return function(){
if (fn) {
const f = fn;
fn = null;
return f.apply(this, arguments);
}
};
};
/**
* Check whether a given value is a GeneratorFunction.
*
* @param Any value
* The value to check.
* @return Boolean
*/
const isGeneratorFunction = function(value){
return value instanceof GeneratorFunction
|| typeof value === 'function'
&& value.constructor
&& value.constructor.name === 'GeneratorFunction';
};
/**
* Check whether a given value is a Generator.
*
* @param Any value
* The value to check.
* @return Boolean
*/
const isGenerator = function(value){
return value instanceof GeneratorFunctionPrototype
|| value != null
&& typeof value === 'object'
&& typeof value.next === 'function'
&& typeof value.send === 'function';
};
/**
* Uses CPS (continuation passing style) to iterate through each yield in a Generator.
*
* @param Generator generator
* The generator object to pump.
* @param Continuation continuation
* The continuation which will be passed the callback that dispatches to
* the generator.
*/
const pump = function(generator, continuation){
if (typeof continuation !== 'function') {
throw new TypeError("Yielded a non-function");
}
continuation(function(err, result){
if (err) {
generator.throw(err);
} else {
const next = generator.send(result).value;
next && pump(generator, next);
}
});
};
/**
* Handles either a Continuable function or a Generator. If passed a Generator a
* nested event loop is run to completion to obtain the final value.
*
* @param Continuable|Continuation
* The thing to pump a single turn.
* @param Callback cb
* The Callback to be passed to the Continuable.
*/
const resolve = function(value, cb){
if (typeof value === 'function') {
value(cb);
} else if (isGenerator(value)) {
run(function*(){
try {
cb(null, yield* value);
} catch (e) {
cb(e);
}
});
} else {
throw new TypeError('Must return a Continuation or Continuable');
}
};
/**
* Wrap an async function as a GeneratorFunction so it can be used with
* delegating yield (yield*).
*
* @param Function fn
* Function to wrap. This function must accept a Callback as its last
* parameter.
* @return Continuable
* Wrapped version of the function that can be yielded to in a
* generator executed using `run`.
*/
const wrap = exports.wrap = function wrap(fn){
return function*(){
const args = slice.call(arguments);
const receiver = this;
return yield function(cb){
fn.apply(receiver, args.concat(cb));
};
};
};
/**
* Creates a generator from a generator function and begins executing it.
*
* @param GeneratorFunction generatorFn
* The generator function to execute. Every yielded value should be a
* Continuable.
*/
const run = exports.run = function run(generatorFn){
const generator = generatorFn();
pump(generator, generator.next().value);
};
const _sleep = wrap(function(ms, cb){
const start = Date.now();
setTimeout(function(){
cb(null, Date.now() - start);
}, ms);
});
/**
* Helper that can be used to pause execution.
*
* @param Number ms
* Time to pause.
* @return Continuation
* Function that can be be yielded to in a generator executed using
* `run`.
*/
exports.sleep = function sleep(ms){
return _sleep(ms);
};
const _tick = wrap(function(){
if (typeof process === 'undefined') {
return function(cb){
setTimeout(cb, 0);
};
}
return process.nextTick;
}());
/**
* Helper that waits until the next event loop tick.
*
* @return Continuation
* Function that can be be yielded to in a generator executed using
* `run`.
*/
exports.tick = function tick(){
return _tick();
};
/**
* Uses a Continuable function to sequentially map an array of values.
*
* @param Array array
* The set of values to map over.
* @param Continuable cb
* The function to be applied to each value.
* @param Any [receiver]
* The |this| value in the callback.
* @return Array
* The mapped array.
*/
exports.forEach = function* forEach(array, itemcb, receiver){
for (let i = 0; i < array.length; i++) {
if (i in array) {
yield* itemcb.call(receiver, array[i], i, array);
}
}
};
/**
* Uses a Continuable function to sequentially map an array of values.
*
* @param Array array
* The set of values to map over.
* @param Continuable itemcb
* The function to be applied to each value.
* @param Any [receiver]
* The |this| value in the callback.
* @return Array
* The mapped array.
*/
exports.map = function* map(array, itemcb, receiver){
const result = [];
for (let i = 0; i < array.length; i++) {
if (i in array) {
result[i] = yield* itemcb.call(receiver, array[i], i, array);
}
}
return result;
};
/**
* Uses a Continuable function to sequentially filter a set of values.
*
* @param Array array
* The set of values to map over.
* @param Continuable itemcb
* The function to be applied to each value.
* @param Any [receiver]
* The |this| value in the callback.
* @return Array
* The filtered array.
*/
exports.filter = function* filter(array, itemcb, receiver){
const result = [];
for (let i = 0; i < array.length; i++) {
if (i in array && (yield* itemcb.call(receiver, array[i], i, array))) {
result.push(array[i]);
}
}
return result;
};
/**
* Uses a Continuable function to reduce a set of values.
*
* @param Array array
* The set of values to map over.
* @param Continuable itemcb
* The function to be applied to each value.
* @param Any [receiver]
* The |this| value in the callback.
* @return Any
* The final accumulated value.
*/
exports.reduce = function* reduce(array, itemcb, initial){
let index, accum;
if (arguments.length < 3) {
index = 1;
accum = array[0];
} else {
index = 0;
accum = initial;
}
for (let i = index; i < array.length; i++) {
if (i in array) {
accum = yield* itemcb(accum, array[i], array);
}
}
return accum;
};
/**
* Returns true the first time a Continuable function returns a truthy value
* against a set of values, otherwise returns false.
*
* @param Array array
* The set of values to map over.
* @param Continuable itemcb
* The function to be applied to each value.
* @param Any [receiver]
* The |this| value in the callback.
* @return Boolean
*/
exports.some = function* some(array, itemcb, receiver){
for (let i = 0; i < array.length; i++) {
if (i in array && (yield* itemcb.call(receiver, array[i], i, array))) {
return true;
}
}
return false;
};
/**
* Returns false the first time a Continuable function returns a falsey value
* against a set of values, otherwise returns true.
*
* @param Array array
* The set of values to map over.
* @param Continuable itemcb
* The function to be applied to each value.
* @param Any [receiver]
* The |this| value in the callback.
* @return Boolean
*/
exports.every = function* every(array, itemcb, receiver){
for (let i = 0; i < array.length; i++) {
if (i in array && !(yield* itemcb.call(receiver, array[i], i, array))) {
return false;
}
}
return true;
};
/**
* Create a parallelized function that works over an array of values.
*
* @param Function transform
* The callback that transforms each value set in the returned array.
* @param Function finalize
* The callback that transforms the completed array as a whole.
* @return Continuable
*/
const parallelFunction = function(transform, finalize){
return wrap(function(array, itemcb, receiver, cb){
const result = [];
let remaining = array.length;
if (!remaining) {
return void cb(null, finalize(result));
}
cb = once(cb);
const handle = function(input, index){
return function(err, value){
if (err) {
cb(err);
} else if (!(index in result)) {
result[index] = transform(input, value);
if (!--remaining) {
cb(null, finalize(result));
}
}
};
};
for (let i=0; i < array.length; i++) {
if (i in array) {
resolve(itemcb.call(receiver, array[i], i, array), handle(array[i], i));
} else {
remaining--;
}
}
});
};
const _parallelForEach = parallelFunction(noop, noop);
/**
* Uses a Continuable function to map an array of values in parallel.
*
* @param Array array
* The set of values to map over.
* @param Continuable itemcb
* The function to be applied to each value.
* @param Any [receiver]
* The |this| value in the callback.
* @return Array
* The filtered array.
*/
exports.parallelForEach = function parallelForEach(array, itemcb, receiver){
return _parallelForEach(array, itemcb, receiver);
};
const _parallelMap = parallelFunction(function(input, result){
return result;
}, function(array){
return array;
});
/**
* Uses a Continuable function to map an array of values in parallel.
*
* @param Array array
* The set of values to map over.
* @param Continuable itemcb
* The function to be applied to each value.
* @param Any [receiver]
* The |this| value in the callback.
* @return Array
* The filtered array.
*/
exports.parallelMap = function parallelMap(array, itemcb, receiver){
return _parallelMap(array, itemcb, receiver);
};
const EMPTY = {};
const _parallelFilter = parallelFunction(function(input, result){
return result ? input : EMPTY;
}, function(array){
return array.filter(function(item){
return item !== EMPTY;
});
});
/**
* Uses a Continuable function to filter a set of values in parallel.
*
* @param Array array
* The set of values to map over.
* @param Continuable itemcb
* The function to be applied to each value.
* @param Any [receiver]
* The |this| value in the callback.
* @return Array
* The filtered array.
*/
exports.parallelFilter = function parallelFilter(array, itemcb, receiver){
return _parallelFilter(array, itemcb, receiver);
};
const _join = wrap(function(items, cb){
const result = [];
let remaining = items.length;
cb = once(cb);
const handle = function(index){
return function(err, value){
if (err) {
cb(err);
} else if (!(index in result)) {
result[index] = value;
if (!--remaining) {
cb(null, result);
}
}
};
};
for (let i=0, item; item = items[i]; i++) {
resolve(item, handle(i));
}
});
/**
* Take multiple Continuables and combine them into a single Continuable
* that returns an array of the completed values, or errors if any error.
*
* @param ...Continuable args
* Any amount of Continuables or arrays of Continuables to join.
* @return Continuable
* The combined Continuable which will yield the results as an array.
*/
exports.join = function join(/* ...args */){
return _join(concat.apply([], arguments));
};
"use strict";
const path = require('path');
const gen = require('../');
function wrapAll(obj, names){
const out = {};
names.forEach(function(name){
out[name] = gen.wrap(obj[name]);
});
return out;
}
const fs = wrapAll(require('fs'), [
'rename', 'ftruncate', 'truncate', 'chown', 'fchown', 'lchown', 'chmod','fchmod',
'lchmod', 'stat', 'lstat', 'fstat', 'link', 'symlink', 'readlink', 'realpath',
'unlink', 'rmdir', 'mkdir', 'readdir', 'close', 'open', 'utimes', 'futimes',
'fsync', 'write', 'read', 'readFile', 'writeFile', 'appendFile'
]);
function* fulldir(dir){
return (yield* fs.readdir(dir)).map(function(child){
return path.resolve(dir, child);
});
}
function* statdir(dir){
const children = yield* fulldir(dir);
return yield* gen.map(children, function(filepath){
return fs.stat(filepath);
});
}
function* sizedir(dir){
return (yield* statdir(dir)).reduce(function(total, child){
return child.isFile() ? child.size + total : total;
}, 0);
}
function* x10(value){
yield* gen.sleep(20);
return value * 10;
}
gen.run(function*(){
console.time('sequential');
console.log(yield* gen.map([1, 2, 3, 4, 5], x10));
console.timeEnd('sequential');
console.time('parallel');
console.log(yield* gen.parallelMap([1, 2, 3, 4, 5], x10));
console.timeEnd('parallel');
console.log(yield* gen.join(fs.stat('.'), fs.stat('..')));
console.log(yield* sizedir('..'));
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment