Skip to content

Instantly share code, notes, and snippets.

@AlicanC
Created October 30, 2015 17:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AlicanC/9058f9ccc9dd739db539 to your computer and use it in GitHub Desktop.
Save AlicanC/9058f9ccc9dd739db539 to your computer and use it in GitHub Desktop.
/*
Queue
- Push task runners (async functions) to its "list"
- Run it
- Subscribe to its observable and wait for results
Runs "threadCount" amount of tasks in the "list"
When any of those tasks are completed, runs another task
If task #2 finishes before #1, it waits for #1 to finish and published before it publishes #2 so results are delivered in order
Use case:
You have 450 images in order which form a 15 second animation at 30 FPS when merged.
You want to manipulate them with a tool and then pipe results into another tool to merge them into an animation.
You can't run 450 processes at the same time because it's crazy.
You can't deliver processed images in random order because that would result in a broken animation.
*/
import Observable from 'zen-observable';
import os from 'os';
class Queue {
constructor(threadCount = 1) {
this.threadCount = threadCount;
this.list = [];
}
push(...taskRunners) {
this.list.push(...taskRunners);
}
run() {
const queue = new Set();
const promises = [];
return new Observable((observer) => {
const work = (async () => {
for (let i = 0; i < this.list.length; i++) {
// Check if the queue is full
if (queue.size >= this.threadCount) {
// Wait until there is space in the queue
await Promise.race(queue);
}
// Run the task and get its promise
const promise = this.list[i]();
// Add the promise to the queue
queue.add(promise);
// Wait for promise to finish, add promise of that to the list
promises[i] = promise.then(async (value) => {
// Remove promise from queue
queue.delete(promise);
// Publish value when previous promise resolves
await Promise.resolve(promises[i - 1]);
observer.next(value);
// Complete if this was the last task
if (!this.list[i + 1]) {
observer.complete();
}
});
}
}());
});
}
}
async function sleep(milliseconds = 0) {
await new Promise((resolve) => {
setTimeout(resolve, milliseconds);
});
}
async function run() {
const queue = new Queue(os.cpus().length);
for (let i = 0; i < 20; i++) {
queue.push(async () => {
console.log(`Running task #${i}...`);
await sleep(3000 * Math.random());
console.log(`Finished task #${i}.`);
return i;
})
}
const observable = queue.run();
observable.subscribe({
next(value) {
console.log('Got value:', value);
},
complete() {
console.log('All done!');
}
});
}
run();
@AlicanC
Copy link
Author

AlicanC commented Oct 30, 2015

Sample output with comments:

Running task #0... <-- Running 8 tasks because `threadCount` was 8.
Running task #1...
Running task #2...
Running task #3...
Running task #4...
Running task #5...
Running task #6...
Running task #7...
Finished task #7. <-- 7 finished, but value isn't sent to subscribers. We need 1,2,3,4,5,6 first.
Running task #8... <-- A new task is ran anyways.
Finished task #0.
Got value: 0 <-- 0 finished. It has nothing else to wait, so its value is published.
Running task #9...
Finished task #9.
Running task #10...
Finished task #10.
Running task #11...
Finished task #5.
Running task #12...
Finished task #8.
Running task #13...
Finished task #1.
Got value: 1 <-- 1 finished and published immediately, because 0 was already published.
Running task #14...
Finished task #3.
Running task #15...
Finished task #6.
Running task #16...
Finished task #4.
Running task #17...
Finished task #2.
Got value: 2
Running task #18...
Got value: 3 <-- Many tasks were waiting for 3. Its fulfillment allows many other values to be published.
Got value: 4
Got value: 5
Got value: 6
Got value: 7
Got value: 8
Got value: 9
Got value: 10
Finished task #15.
Running task #19...
Finished task #11.
Got value: 11
Finished task #16.
Finished task #18.
Finished task #17.
Finished task #12.
Got value: 12
Finished task #14.
Finished task #19.
Finished task #13.
Got value: 13
Got value: 14
Got value: 15
Got value: 16
Got value: 17
Got value: 18
Got value: 19
All done! <-- Observable completed. All tasks were ran and their results were delivered in order.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment