Skip to content

Instantly share code, notes, and snippets.

@airburst
Created June 13, 2018 11:11
Show Gist options
  • Save airburst/f0465e9aa48df87b0096cb03d6ecd614 to your computer and use it in GitHub Desktop.
Save airburst/f0465e9aa48df87b0096cb03d6ecd614 to your computer and use it in GitHub Desktop.
Run an async queue of records through an action and store/use result of each
class TaskQueue {
constructor(concurrency = 1) {
this.concurrency = concurrency;
this.running = 0;
this.queue = [];
}
pushTask(task) {
this.queue.push(task);
this.next();
}
next() {
while (this.running < this.concurrency && this.queue.length) {
const task = this.queue.shift();
// task(() => {
task().then(() => {
this.running -= 1;
this.next();
});
this.running += 1;
}
}
empty() {
this.queue = [];
}
}
// export default TaskQueue;
const sequenceWithResultStore = (records, action) => new Promise((resolve) => {
const queue = new TaskQueue(); // Concurrency = 1
let completed = 0;
let lastResult = 0;
const increment = () => {
completed += 1;
if (completed === records.length) { resolve(lastResult); }
};
records.forEach((record) => {
const task = () => action(record, lastResult)
.then((result) => {
lastResult = result;
console.log(lastResult);
increment();
})
.catch((err) => {
console.log('BATCH ERROR', err.message, record);
increment();
});
queue.pushTask(task);
});
});
const delay = ms => new Promise(res => setTimeout(res, ms));
const testAction = (current, lastResult = 0) =>
new Promise(async (resolve) => {
await delay(500);
resolve(current + lastResult);
});
const testBatch = async () => {
const result = await sequenceWithResultStore([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], testAction);
console.log(`Result: ${result}`);
};
testBatch();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment