Created
October 30, 2015 17:35
-
-
Save AlicanC/9058f9ccc9dd739db539 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Queue | |
- Push task runners (async functions) to its "list" | |
- Run it | |
- Subscribe to its observable and wait for results | |
Runs "threadCount" amount of tasks in the "list" | |
When any of those tasks are completed, runs another task | |
If task #2 finishes before #1, it waits for #1 to finish and published before it publishes #2 so results are delivered in order | |
Use case: | |
You have 450 images in order which form a 15 second animation at 30 FPS when merged. | |
You want to manipulate them with a tool and then pipe results into another tool to merge them into an animation. | |
You can't run 450 processes at the same time because it's crazy. | |
You can't deliver processed images in random order because that would result in a broken animation. | |
*/ | |
import Observable from 'zen-observable'; | |
import os from 'os'; | |
class Queue { | |
constructor(threadCount = 1) { | |
this.threadCount = threadCount; | |
this.list = []; | |
} | |
push(...taskRunners) { | |
this.list.push(...taskRunners); | |
} | |
run() { | |
const queue = new Set(); | |
const promises = []; | |
return new Observable((observer) => { | |
const work = (async () => { | |
for (let i = 0; i < this.list.length; i++) { | |
// Check if the queue is full | |
if (queue.size >= this.threadCount) { | |
// Wait until there is space in the queue | |
await Promise.race(queue); | |
} | |
// Run the task and get its promise | |
const promise = this.list[i](); | |
// Add the promise to the queue | |
queue.add(promise); | |
// Wait for promise to finish, add promise of that to the list | |
promises[i] = promise.then(async (value) => { | |
// Remove promise from queue | |
queue.delete(promise); | |
// Publish value when previous promise resolves | |
await Promise.resolve(promises[i - 1]); | |
observer.next(value); | |
// Complete if this was the last task | |
if (!this.list[i + 1]) { | |
observer.complete(); | |
} | |
}); | |
} | |
}()); | |
}); | |
} | |
} | |
async function sleep(milliseconds = 0) { | |
await new Promise((resolve) => { | |
setTimeout(resolve, milliseconds); | |
}); | |
} | |
async function run() { | |
const queue = new Queue(os.cpus().length); | |
for (let i = 0; i < 20; i++) { | |
queue.push(async () => { | |
console.log(`Running task #${i}...`); | |
await sleep(3000 * Math.random()); | |
console.log(`Finished task #${i}.`); | |
return i; | |
}) | |
} | |
const observable = queue.run(); | |
observable.subscribe({ | |
next(value) { | |
console.log('Got value:', value); | |
}, | |
complete() { | |
console.log('All done!'); | |
} | |
}); | |
} | |
run(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Sample output with comments: