Skip to content

Instantly share code, notes, and snippets.

@kevincennis
Last active October 25, 2023 11:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kevincennis/dbce327379cc24a0ef77238b35b4e00c to your computer and use it in GitHub Desktop.
Save kevincennis/dbce327379cc24a0ef77238b35b4e00c to your computer and use it in GitHub Desktop.
Async Concurrent Queue
( async() => {
const items = 'abcdefghijklmnopqrstuvwxyz'.split('');
const concurrency = 10;
async function process() {
return new Promise( r => setTimeout( r, Math.random() * 1000 ) );
}
async function report({ item, processed, total, done }){
if ( done ) {
return console.log( 'Processing complete (%d items)', total );
}
console.log( 'Processed %s (%d of %d)', item, processed, total );
}
await queue({ items, concurrency, process, report });
console.log('Done');
})();
async function queue({ items, concurrency = 5, process, report = () => {} }) {
const pending = items.slice();
const total = pending.length;
const state = {
total: pending.length,
inflight: 0,
processed: 0,
queued: pending.length,
done: false
};
return new Promise( ( resolve, reject ) => {
async function dequeue() {
if ( pending.length === 0 && state.inflight === 0 ) {
state.done = true;
report({ item: null, ...state });
return setImmediate( resolve );
}
if ( state.inflight >= concurrency || pending.length === 0 ) {
return;
}
const item = pending.shift();
state.inflight++;
state.queued--;
process( item ).then( () => {
state.inflight--;
state.processed++;
report({ item, ...state });
dequeue();
}, reject );
dequeue();
}
dequeue();
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment