Skip to content

Instantly share code, notes, and snippets.

@shuhei
Last active Jul 3, 2019
Embed
What would you like to do?
jobqueue in JavaScript

jobqueue

This is a somewhat evil restaurant. When it has available seats, customers can have seats immediately and stay as long as they want. When the restaurant is full, customers need to wait on a queue. The strange thing is that new customers are led to the head of the queue. If you are the first customer who starts a queue, be prepared to wait for a long time because new customers join in front of you. And the waiter will tell you to leave the queue because you waited too long (timeout) or a new customer is joining the queue but the queue is too long (stack full)!

Implementations

class Jobqueue {
constructor({ maxRunningJobs = 1, maxWaitingJobs = 1, waitingTimeout }) {
this.maxRunningJobs = maxRunningJobs;
this.maxWaitingJobs = maxWaitingJobs;
this.waitingTimeout = waitingTimeout;
this.waitingList = [];
this.runningJobs = 0;
this.onRunningJobDone = this.onRunningJobDone.bind(this);
}
// public
// - resolve with a callback function that must be called when a job is done
// - reject with a timeout error
// - reject with a full stack error
wait() {
if (this.isFull()) {
this.rejectOldestJob();
}
if (this.maxRunningJobs > this.runningJobs) {
this.runningJobs++;
return Promise.resolve(this.onRunningJobDone);
}
return new Promise((resolve, reject) => {
let timer;
// https://electronics.howstuffworks.com/everyday-tech/restaurant-pager.htm
const pager = {
resolve: () => {
clearTimeout(timer);
this.runningJobs++;
resolve(this.onRunningJobDone);
},
reject: reason => {
clearTimeout(timer);
reject(reason);
}
};
this.waitingList.push(pager);
if (Number.isFinite(this.waitingTimeout)) {
timer = setTimeout(() => {
// `index` should be `0` or `-1` as long as the timer works in order.
const index = this.waitingList.indexOf(pager);
if (index >= 0) {
this.waitingList.splice(index, 1);
pager.reject(new Error("timeout in stack"));
}
}, this.waitingTimeout);
timer.unref();
}
});
}
// public
async do(job) {
const done = await this.wait();
return job().finally(done);
}
onRunningJobDone() {
this.runningJobs--;
this.runNewestJob();
}
runNewestJob() {
if (this.waitingList.length === 0) {
return;
}
const newest = this.waitingList.pop();
newest.resolve();
}
rejectOldestJob() {
if (this.waitingList.length === 0) {
return;
}
const oldest = this.waitingList.shift();
oldest.reject(new Error("full stack"));
}
isFull() {
return this.waitingList.length >= this.maxWaitingJobs;
}
}
module.exports = Jobqueue;
const Jobqueue = require(".");
const jobqueue = new Jobqueue({
maxRunningJobs: 10,
maxWaitingJobs: 100,
waitingTimeout: 1000
});
function timeout(ms) {
return new Promise(resolve => {
setTimeout(resolve, ms);
});
}
const start = Date.now();
for (let i = 0; i < 100; i++) {
const delay = 100 + Math.random() * 100;
jobqueue
.do(() => timeout(delay))
.then(
() => {
const time = Date.now() - start;
console.log(time, i, delay, {
runningJobs: jobqueue.runningJobs,
waitingJobs: jobqueue.waitingList.length
});
},
err => {
const time = Date.now() - start;
console.log(time, i, delay, err.message, {
runningJobs: jobqueue.runningJobs,
waitingJobs: jobqueue.waitingList.length
});
}
);
}
@shuhei

This comment has been minimized.

Copy link
Owner Author

@shuhei shuhei commented Jun 26, 2019

It's important to pick a big maxRunningJobs to make sure that a queue is normally not formed at all. A queue should be reserved for an emergency. In other words, jobqueue is a nice version of limiting concurrent operations.

@shuhei

This comment has been minimized.

Copy link
Owner Author

@shuhei shuhei commented Jun 26, 2019

Note that timeout is applied only for the time when a job is waiting to be executed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment