Created
June 4, 2013 09:09
-
-
Save idx3d/5704685 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
const GeneratorFunction = function*(){}.constructor; | |
const GeneratorFunctionPrototype = GeneratorFunction.prototype; | |
const slice = Array.prototype.slice; | |
const concat = Array.prototype.concat; | |
/** | |
* A user callback function that is raised when the result of an async function | |
* is available. | |
* | |
* @param Any error | |
* If truthy indicates an error result. Will be thrown from the | |
* generator. | |
* @param Any result | |
* If the callbaxk is successful, will be the yielded value. | |
*/ | |
function Callback(error, result){} | |
/** | |
* The type of function that must be yielded from a generator being run. | |
* | |
* @param Callback callback | |
* The function which will handle the error or result of the | |
* continuation. | |
*/ | |
function Continuation(callback){} | |
/** | |
* A GeneratorFunction where every value yielded must either itself be a | |
* Generator or a Continuation. | |
* | |
* @return Generator | |
*/ | |
function Continuable(/*...args*/){} | |
/** | |
* A no-operation function. | |
*/ | |
const noop = function(){}; | |
/** | |
* A function that returns the value given to it. | |
* | |
* @param Any value | |
* The value to return | |
* @return Any | |
*/ | |
const identity = function(value){ | |
return value; | |
}; | |
/** | |
* Wrap a function so it is only called the first time the wrapper is called. | |
* | |
* @param Function fn | |
* The function to wrap. | |
* @return Function | |
* The wrapper. | |
*/ | |
const once = function(fn){ | |
return function(){ | |
if (fn) { | |
const f = fn; | |
fn = null; | |
return f.apply(this, arguments); | |
} | |
}; | |
}; | |
/** | |
* Check whether a given value is a GeneratorFunction. | |
* | |
* @param Any value | |
* The value to check. | |
* @return Boolean | |
*/ | |
const isGeneratorFunction = function(value){ | |
return value instanceof GeneratorFunction | |
|| typeof value === 'function' | |
&& value.constructor | |
&& value.constructor.name === 'GeneratorFunction'; | |
}; | |
/** | |
* Check whether a given value is a Generator. | |
* | |
* @param Any value | |
* The value to check. | |
* @return Boolean | |
*/ | |
const isGenerator = function(value){ | |
return value instanceof GeneratorFunctionPrototype | |
|| value != null | |
&& typeof value === 'object' | |
&& typeof value.next === 'function' | |
&& typeof value.send === 'function'; | |
}; | |
/** | |
* Uses CPS (continuation passing style) to iterate through each yield in a Generator. | |
* | |
* @param Generator generator | |
* The generator object to pump. | |
* @param Continuation continuation | |
* The continuation which will be passed the callback that dispatches to | |
* the generator. | |
*/ | |
const pump = function(generator, continuation){ | |
if (typeof continuation !== 'function') { | |
throw new TypeError("Yielded a non-function"); | |
} | |
continuation(function(err, result){ | |
if (err) { | |
generator.throw(err); | |
} else { | |
const next = generator.send(result).value; | |
next && pump(generator, next); | |
} | |
}); | |
}; | |
/** | |
* Handles either a Continuable function or a Generator. If passed a Generator a | |
* nested event loop is run to completion to obtain the final value. | |
* | |
* @param Continuable|Continuation | |
* The thing to pump a single turn. | |
* @param Callback cb | |
* The Callback to be passed to the Continuable. | |
*/ | |
const resolve = function(value, cb){ | |
if (typeof value === 'function') { | |
value(cb); | |
} else if (isGenerator(value)) { | |
run(function*(){ | |
try { | |
cb(null, yield* value); | |
} catch (e) { | |
cb(e); | |
} | |
}); | |
} else { | |
throw new TypeError('Must return a Continuation or Continuable'); | |
} | |
}; | |
/** | |
* Wrap an async function as a GeneratorFunction so it can be used with | |
* delegating yield (yield*). | |
* | |
* @param Function fn | |
* Function to wrap. This function must accept a Callback as its last | |
* parameter. | |
* @return Continuable | |
* Wrapped version of the function that can be yielded to in a | |
* generator executed using `run`. | |
*/ | |
const wrap = exports.wrap = function wrap(fn){ | |
return function*(){ | |
const args = slice.call(arguments); | |
const receiver = this; | |
return yield function(cb){ | |
fn.apply(receiver, args.concat(cb)); | |
}; | |
}; | |
}; | |
/** | |
* Creates a generator from a generator function and begins executing it. | |
* | |
* @param GeneratorFunction generatorFn | |
* The generator function to execute. Every yielded value should be a | |
* Continuable. | |
*/ | |
const run = exports.run = function run(generatorFn){ | |
const generator = generatorFn(); | |
pump(generator, generator.next().value); | |
}; | |
const _sleep = wrap(function(ms, cb){ | |
const start = Date.now(); | |
setTimeout(function(){ | |
cb(null, Date.now() - start); | |
}, ms); | |
}); | |
/** | |
* Helper that can be used to pause execution. | |
* | |
* @param Number ms | |
* Time to pause. | |
* @return Continuation | |
* Function that can be be yielded to in a generator executed using | |
* `run`. | |
*/ | |
exports.sleep = function sleep(ms){ | |
return _sleep(ms); | |
}; | |
const _tick = wrap(function(){ | |
if (typeof process === 'undefined') { | |
return function(cb){ | |
setTimeout(cb, 0); | |
}; | |
} | |
return process.nextTick; | |
}()); | |
/** | |
* Helper that waits until the next event loop tick. | |
* | |
* @return Continuation | |
* Function that can be be yielded to in a generator executed using | |
* `run`. | |
*/ | |
exports.tick = function tick(){ | |
return _tick(); | |
}; | |
/** | |
* Uses a Continuable function to sequentially map an array of values. | |
* | |
* @param Array array | |
* The set of values to map over. | |
* @param Continuable cb | |
* The function to be applied to each value. | |
* @param Any [receiver] | |
* The |this| value in the callback. | |
* @return Array | |
* The mapped array. | |
*/ | |
exports.forEach = function* forEach(array, itemcb, receiver){ | |
for (let i = 0; i < array.length; i++) { | |
if (i in array) { | |
yield* itemcb.call(receiver, array[i], i, array); | |
} | |
} | |
}; | |
/** | |
* Uses a Continuable function to sequentially map an array of values. | |
* | |
* @param Array array | |
* The set of values to map over. | |
* @param Continuable itemcb | |
* The function to be applied to each value. | |
* @param Any [receiver] | |
* The |this| value in the callback. | |
* @return Array | |
* The mapped array. | |
*/ | |
exports.map = function* map(array, itemcb, receiver){ | |
const result = []; | |
for (let i = 0; i < array.length; i++) { | |
if (i in array) { | |
result[i] = yield* itemcb.call(receiver, array[i], i, array); | |
} | |
} | |
return result; | |
}; | |
/** | |
* Uses a Continuable function to sequentially filter a set of values. | |
* | |
* @param Array array | |
* The set of values to map over. | |
* @param Continuable itemcb | |
* The function to be applied to each value. | |
* @param Any [receiver] | |
* The |this| value in the callback. | |
* @return Array | |
* The filtered array. | |
*/ | |
exports.filter = function* filter(array, itemcb, receiver){ | |
const result = []; | |
for (let i = 0; i < array.length; i++) { | |
if (i in array && (yield* itemcb.call(receiver, array[i], i, array))) { | |
result.push(array[i]); | |
} | |
} | |
return result; | |
}; | |
/** | |
* Uses a Continuable function to reduce a set of values. | |
* | |
* @param Array array | |
* The set of values to map over. | |
* @param Continuable itemcb | |
* The function to be applied to each value. | |
* @param Any [receiver] | |
* The |this| value in the callback. | |
* @return Any | |
* The final accumulated value. | |
*/ | |
exports.reduce = function* reduce(array, itemcb, initial){ | |
let index, accum; | |
if (arguments.length < 3) { | |
index = 1; | |
accum = array[0]; | |
} else { | |
index = 0; | |
accum = initial; | |
} | |
for (let i = index; i < array.length; i++) { | |
if (i in array) { | |
accum = yield* itemcb(accum, array[i], array); | |
} | |
} | |
return accum; | |
}; | |
/** | |
* Returns true the first time a Continuable function returns a truthy value | |
* against a set of values, otherwise returns false. | |
* | |
* @param Array array | |
* The set of values to map over. | |
* @param Continuable itemcb | |
* The function to be applied to each value. | |
* @param Any [receiver] | |
* The |this| value in the callback. | |
* @return Boolean | |
*/ | |
exports.some = function* some(array, itemcb, receiver){ | |
for (let i = 0; i < array.length; i++) { | |
if (i in array && (yield* itemcb.call(receiver, array[i], i, array))) { | |
return true; | |
} | |
} | |
return false; | |
}; | |
/** | |
* Returns false the first time a Continuable function returns a falsey value | |
* against a set of values, otherwise returns true. | |
* | |
* @param Array array | |
* The set of values to map over. | |
* @param Continuable itemcb | |
* The function to be applied to each value. | |
* @param Any [receiver] | |
* The |this| value in the callback. | |
* @return Boolean | |
*/ | |
exports.every = function* every(array, itemcb, receiver){ | |
for (let i = 0; i < array.length; i++) { | |
if (i in array && !(yield* itemcb.call(receiver, array[i], i, array))) { | |
return false; | |
} | |
} | |
return true; | |
}; | |
/** | |
* Create a parallelized function that works over an array of values. | |
* | |
* @param Function transform | |
* The callback that transforms each value set in the returned array. | |
* @param Function finalize | |
* The callback that transforms the completed array as a whole. | |
* @return Continuable | |
*/ | |
const parallelFunction = function(transform, finalize){ | |
return wrap(function(array, itemcb, receiver, cb){ | |
const result = []; | |
let remaining = array.length; | |
if (!remaining) { | |
return void cb(null, finalize(result)); | |
} | |
cb = once(cb); | |
const handle = function(input, index){ | |
return function(err, value){ | |
if (err) { | |
cb(err); | |
} else if (!(index in result)) { | |
result[index] = transform(input, value); | |
if (!--remaining) { | |
cb(null, finalize(result)); | |
} | |
} | |
}; | |
}; | |
for (let i=0; i < array.length; i++) { | |
if (i in array) { | |
resolve(itemcb.call(receiver, array[i], i, array), handle(array[i], i)); | |
} else { | |
remaining--; | |
} | |
} | |
}); | |
}; | |
const _parallelForEach = parallelFunction(noop, noop); | |
/** | |
* Uses a Continuable function to map an array of values in parallel. | |
* | |
* @param Array array | |
* The set of values to map over. | |
* @param Continuable itemcb | |
* The function to be applied to each value. | |
* @param Any [receiver] | |
* The |this| value in the callback. | |
* @return Array | |
* The filtered array. | |
*/ | |
exports.parallelForEach = function parallelForEach(array, itemcb, receiver){ | |
return _parallelForEach(array, itemcb, receiver); | |
}; | |
const _parallelMap = parallelFunction(function(input, result){ | |
return result; | |
}, function(array){ | |
return array; | |
}); | |
/** | |
* Uses a Continuable function to map an array of values in parallel. | |
* | |
* @param Array array | |
* The set of values to map over. | |
* @param Continuable itemcb | |
* The function to be applied to each value. | |
* @param Any [receiver] | |
* The |this| value in the callback. | |
* @return Array | |
* The filtered array. | |
*/ | |
exports.parallelMap = function parallelMap(array, itemcb, receiver){ | |
return _parallelMap(array, itemcb, receiver); | |
}; | |
const EMPTY = {}; | |
const _parallelFilter = parallelFunction(function(input, result){ | |
return result ? input : EMPTY; | |
}, function(array){ | |
return array.filter(function(item){ | |
return item !== EMPTY; | |
}); | |
}); | |
/** | |
* Uses a Continuable function to filter a set of values in parallel. | |
* | |
* @param Array array | |
* The set of values to map over. | |
* @param Continuable itemcb | |
* The function to be applied to each value. | |
* @param Any [receiver] | |
* The |this| value in the callback. | |
* @return Array | |
* The filtered array. | |
*/ | |
exports.parallelFilter = function parallelFilter(array, itemcb, receiver){ | |
return _parallelFilter(array, itemcb, receiver); | |
}; | |
const _join = wrap(function(items, cb){ | |
const result = []; | |
let remaining = items.length; | |
cb = once(cb); | |
const handle = function(index){ | |
return function(err, value){ | |
if (err) { | |
cb(err); | |
} else if (!(index in result)) { | |
result[index] = value; | |
if (!--remaining) { | |
cb(null, result); | |
} | |
} | |
}; | |
}; | |
for (let i=0, item; item = items[i]; i++) { | |
resolve(item, handle(i)); | |
} | |
}); | |
/** | |
* Take multiple Continuables and combine them into a single Continuable | |
* that returns an array of the completed values, or errors if any error. | |
* | |
* @param ...Continuable args | |
* Any amount of Continuables or arrays of Continuables to join. | |
* @return Continuable | |
* The combined Continuable which will yield the results as an array. | |
*/ | |
exports.join = function join(/* ...args */){ | |
return _join(concat.apply([], arguments)); | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
const path = require('path'); | |
const gen = require('../'); | |
function wrapAll(obj, names){ | |
const out = {}; | |
names.forEach(function(name){ | |
out[name] = gen.wrap(obj[name]); | |
}); | |
return out; | |
} | |
const fs = wrapAll(require('fs'), [ | |
'rename', 'ftruncate', 'truncate', 'chown', 'fchown', 'lchown', 'chmod','fchmod', | |
'lchmod', 'stat', 'lstat', 'fstat', 'link', 'symlink', 'readlink', 'realpath', | |
'unlink', 'rmdir', 'mkdir', 'readdir', 'close', 'open', 'utimes', 'futimes', | |
'fsync', 'write', 'read', 'readFile', 'writeFile', 'appendFile' | |
]); | |
function* fulldir(dir){ | |
return (yield* fs.readdir(dir)).map(function(child){ | |
return path.resolve(dir, child); | |
}); | |
} | |
function* statdir(dir){ | |
const children = yield* fulldir(dir); | |
return yield* gen.map(children, function(filepath){ | |
return fs.stat(filepath); | |
}); | |
} | |
function* sizedir(dir){ | |
return (yield* statdir(dir)).reduce(function(total, child){ | |
return child.isFile() ? child.size + total : total; | |
}, 0); | |
} | |
function* x10(value){ | |
yield* gen.sleep(20); | |
return value * 10; | |
} | |
gen.run(function*(){ | |
console.time('sequential'); | |
console.log(yield* gen.map([1, 2, 3, 4, 5], x10)); | |
console.timeEnd('sequential'); | |
console.time('parallel'); | |
console.log(yield* gen.parallelMap([1, 2, 3, 4, 5], x10)); | |
console.timeEnd('parallel'); | |
console.log(yield* gen.join(fs.stat('.'), fs.stat('..'))); | |
console.log(yield* sizedir('..')); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment