Skip to content

Instantly share code, notes, and snippets.

@dimkir
Created December 12, 2019 17:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dimkir/9ea2ebab28ed89a42ebdde1b408b3a61 to your computer and use it in GitHub Desktop.
Save dimkir/9ea2ebab28ed89a42ebdde1b408b3a61 to your computer and use it in GitHub Desktop.
Simple Queue Implemenation in JS
// --------
// USAGE
// --------
const q = queue({
onJobComplete: ({alias, result})=>{
console.log(`Completed job ${alias} with result: `, result);
},
onJobFail: ({alias, error})=>{
console.error(`Job ${alias} had error: `, error);
}
});
q.submitJob('job1', function({ alias }){
console.log(`We start job ${alias} ...`);
return _delay(3000).then(_=>{
console.log(`Finished job 1 after 3 seconds...`);
return `solution 1`;
});
})
q.submitJob('job1failure', ()=>{
return _delay(1000).then(result=>{
throw new Error(`This is simulated error`);
});
});
q.submitJob('job2', function({ alias }){
console.log(`We start job ${alias} ...`);
return _delay(2000).then(_=>{
console.log(`Finished job 2 after 2 seconds...`);
return `solution2`;
});
});
// --------
// IMPLEMENTATION
// --------
function queue({ onJobComplete, onJobFail , quitOnDrain = true, drainTimeout = 1000}){
const noop = ()=>{};
onJobComplete = onJobComplete || noop;
onJobFail = onJobFail || noop;
let queueTickets = [];
let jobInProgress = null;
let resultHistory = [];
let lastDrainTs = null;
let iv = setInterval(() => {
if ( jobInProgress ) return;
if ( queueTickets.length < 1 ) {
checkStop();
return;
}
else{
lastDrainTs = null;
}
let ticket;
let { alias, jobFn } = ticket = queueTickets.shift();
jobInProgress = ticket;
Promise.resolve(jobFn(ticket))
.then(result=>{
jobInProgress = null;
let receipt = { alias, result };
resultHistory.push(receipt);
onJobComplete(receipt);
})
.catch(error=>{
jobInProgress = null;
let receipt = { alias, error };
resultHistory.push(receipt);
onJobFail(receipt);
});
}, 100);
function checkStop(){
if ( !lastDrainTs ) { // first time drain detected
lastDrainTs = new Date();
return;
}
let now = new Date();
if ( now.getTime() - lastDrainTs.getTime() > drainTimeout){
clearInterval(iv);
}
else{
// console.log(`Waiting for drain timeout...`);
}
}
const api = {
submitJob(alias, jobFn){
queueTickets.push({ alias, jobFn});
console.log(`Submitted job ${alias}. Total jobs: ${queueTickets.length}`);
return api;
}
};
return api;
}
// --------
// UTILS
// --------
function _delay(millis){
return new Promise(resolve=>setTimeout(resolve, millis));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment