Skip to content

Instantly share code, notes, and snippets.

@boxofrox
Last active May 30, 2016 16:02
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save boxofrox/41d66d841e37d3827796 to your computer and use it in GitHub Desktop.
Save boxofrox/41d66d841e37d3827796 to your computer and use it in GitHub Desktop.
const R = require('ramda');
const Future = require('ramda-fantasy').Future;
// :: Int -> [Future a b] -> Future a [b]
// number of workers :: Int
// tasks to do :: [Future a b]
const batchParallel = R.curry((N, as) => {
if (0 === as.length || N < 1) {
return Future.of([]);
}
let idx = 0; // task index.
let completed = 0; // number of completed tasks.
let n = 0; // number of active tasks.
let errorResult = null; // error message of first failed task.
let resolved = false; // so we ignore active tasks that complete after an error.
const len = as.length;
const results = new Array(len);
const nextTask = () => idx += 1;
const push = () => n += 1; // signal a new active task.
const pop = () => {
n -= 1; // signal an open slot for a task.
completed += 1; // signal one more completed task.
};
return new Future((rej, ok) => {
const schedule = () => {
if (resolved) {
return;
}
else if (errorResult) {
resolved = true;
return rej(errorResult); // we're done here
}
else if (N === completed) {
resolved = true; // this should be a no-op.
return ok(results); // all tasks completed
}
while (n < N && idx < len) {
run(as[idx], idx); // run task idx
nextTask();
}
};
const run = (task, i) => {
push();
task.fork(
error => {
errorResult = error;
pop();
schedule();
},
result => {
results[i] = result;
pop();
schedule();
}
);
};
schedule();
});
});
module.exports = batchParallel;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment