Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Bluebird asParallel
Promise.longStackTraces();
//end debug
/**
Hijack Promise.map to accept parallelism limit argument
**/
function queueConcurrent(work, degreeOfParallelism) {
// work is array of functions returning promises
"use strict";
return new Promise(function (ful, rej) {
var results = new Array(work.length);
degreeOfParallelism = Math.min(work.length, degreeOfParallelism);
var last = degreeOfParallelism;
for (var i = 0; i < last; i++) {
var p = work[i]().bind(i).then(queueNext, rej);
}
function queueNext(result) {
results[this] = result;
if (last === work.length + degreeOfParallelism - 1) return ful(results);
if (last >= work.length) return last++;
return work[last++]().bind(last).then(queueNext, rej);
}
});
}
var oldMap = Promise.prototype.map;
Promise.prototype.map = function mapLimit(fn,n) {
if(arguments.length === 1){
return oldMap.apply(this,arguments);
}
return this.then(function (work) {
return queueConcurrent(work.map(function(el){
return function(){
return Promise.resolve(fn(el));
};
}), n);
});
};
/**
ParallelJS changes
*/
var isNode = typeof global !== "undefined" && {}.toString.call(global) == '[object global]';
var maxWorkers = isNode ? require('os').cpus().length : 4;
function Parallel(){}
Promise.prototype.asParallel = function(opts){
var p = this.then(function(v){ return v;});
p.requiredScripts = [];
p.requiredFunctions = [];
p.getWorkerSource = Parallel.prototype.getWorkerSource;
p._spawnWorker = Parallel.prototype._spawnWorker;
p.spawn = Parallel.prototype.spawn;
p.options = {evalPath:null};
p.map = Parallel.prototype.map;
return p;
};
Parallel.prototype.getWorkerSource = function (cb) {
var preStr = '';
var i = 0;
if (!isNode && this.requiredScripts.length !== 0) {
preStr += 'importScripts("' + this.requiredScripts.join('","') + '");\r\n';
}
for (i = 0; i < this.requiredFunctions.length; ++i) {
if (this.requiredFunctions[i].name) {
preStr += 'var ' + this.requiredFunctions[i].name + ' = ' + this.requiredFunctions[i].fn.toString() + ';';
} else {
preStr += this.requiredFunctions[i].fn.toString();
}
}
if (isNode) {
return preStr + 'process.on("message", function(e) {process.send(JSON.stringify((' + cb.toString() + ')(JSON.parse(e).data)))})';
} else {
return preStr + 'self.onmessage = function(e) {self.postMessage((' + cb.toString() + ')(e.data))}';
}
};
Parallel.prototype._spawnWorker = function (cb) {
var wrk;
var src = this.getWorkerSource(cb);
if (isNode) {
wrk = new Worker(this.options.evalPath);
wrk.postMessage(src);
} else {
if (Worker === undefined) {
return undefined;
}
try {
if (!URL) {
throw new Error('Can\'t create a blob URL in this browser!');
} else {
//console.log(src);
var blob = new Blob([src], {
type: 'text/javascript'
});
var url = URL.createObjectURL(blob);
wrk = new Worker(url);
}
} catch (e) {
if (this.options.evalPath !== null) { // blob/url unsupported, cross-origin error
wrk = new Worker(this.options.evalPath);
wrk.postMessage(src);
} else {
throw e;
}
}
}
return wrk;
};
Parallel.prototype.spawn = function (cb,data) {
var that = this;
return this.then(function (d) {
var wrk = that._spawnWorker(cb);
if (wrk !== undefined) {
var p = pMessage(wrk).then(function (msg) {
return msg.data;
});
wrk.postMessage(data);
return p;
}
return cb(data); // sync fallback
});
function pMessage(wrk){
return new Promise(function(resolve,reject){
wrk.onmessage = resolve;
wrk.onerror = function(ev){
reject(ev.message);
};
}).finally(function(){
wrk.terminate();
});
}
};
Parallel.prototype.map = function map(cb,deg){
var that = this;
return Promise.prototype.map.call(this,function(el){
return that.spawn(cb,el);
}, deg || maxWorkers);
};
var arr = []; for(var i=0;i<8;i++) arr.push(40);
Promise.resolve(arr).tap(function(){
console.time("time");
}).asParallel().map(function(el){
function fib(n){
return n < 2? 1 : (fib(n-1)+fib(n-2));
}
return fib(el);
},8).then(function(res){
console.log(res);
}).tap(function(el){
console.timeEnd("time");
});
@benjamingr

This comment has been minimized.

Copy link
Owner Author

benjamingr commented Apr 12, 2014

Bits of source code ripped from ParallelJS, so props for them for _spawnWorker and etc.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.