Skip to content

Instantly share code, notes, and snippets.

@tomersh
Last active April 20, 2018 17:10
Show Gist options
  • Save tomersh/3a8c9c35c5e53de92cf2fea0c4a11ad5 to your computer and use it in GitHub Desktop.
Save tomersh/3a8c9c35c5e53de92cf2fea0c4a11ad5 to your computer and use it in GitHub Desktop.
A simple pooled promise implementation
Promise.joinPooledTasks = (worker, poolSize, onTaskFinished) => {
let didEnqueAllTasks = false;
let nextTaskIndex = -1;
const onTaskEnd = (index, status, result, resolve) => {
if (onTaskFinished) {
onTaskFinished(index, status, result);
}
const nextTask = createNextTask(nextTaskIndex + 1);
const currentResultObject = {
index: index,
status: status,
};
if (status === "success") {
currentResultObject.result = result;
} else {
currentResultObject.error = result;
}
if (nextTask !== null) {
nextTaskIndex += 1;
nextTask.then((nextTaskResult) => {
resolve([currentResultObject].concat([].concat.apply([], nextTaskResult)));
});
} else {
resolve([currentResultObject]);
}
};
const createNextTask = (index) => {
let nextTask = worker(index);
if (nextTask === null) {
didEnqueAllTasks = true;
return null;
}
const isPromise = typeof nextTask.then == 'function';
if (!isPromise) {
nextTask = new Promise((resolve) => {
nextTask();
resolve();
});
}
return new Promise((resolve) => {
nextTask.then(result => {
onTaskEnd(index, "success", result, resolve);
},
(err) => {
onTaskEnd(index, "failure", err, resolve);
});
});
};
const tasks = [];
do {
nextTaskIndex += 1;
const nextTask = createNextTask(nextTaskIndex);
if (nextTask !== null) {
tasks.push(nextTask);
}
} while (nextTaskIndex < poolSize && !didEnqueAllTasks);
return Promise.all(tasks).then((results) => {
return [].concat.apply([], results).sort((x, y) => x.index - y.index);
});
}
//Testing code
function getRandomInt(min, max) {
min = Math.ceil(min);
max = Math.floor(max);
return Math.floor(Math.random() * (max - min)) + min;
}
var worker = index => {
if (index > 20) return null;
return new Promise((resolve, reject) => {
setTimeout(() => {
if (Math.random() > 0.5) {
resolve(index)
} else {
reject(index);
}
}, getRandomInt(1000, 2250));
});
};
Promise.joinPooledTasks(worker, 3, (index, status, result) => {
console.log(`task #${index}: ${status}: ${result}`)
}).then(results => {
console.log(`${results.length} promises ended`);
for (let i = 0; i < results.length; i++) {
const result = results[i];
if (result.status === "success") {
console.log(`${i} >> success: ${result.result}`);
} else {
console.log(`${i} >> failure: ${result.error}`);
}
}
}, err => {
console.log(err);
});
@tomersh
Copy link
Author

tomersh commented Nov 22, 2017

worker should return Promise that represents the task or null if there are no more tasks.

https://repl.it/repls/FantasticGleefulReference

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment