Skip to content

Instantly share code, notes, and snippets.

@shuhei

shuhei/README.md

Last active Jul 3, 2019
Embed
What would you like to do?
jobqueue in JavaScript

jobqueue

This is a somewhat evil restaurant. When it has available seats, customers can have seats immediately and stay as long as they want. When the restaurant is full, customers need to wait on a queue. The strange thing is that new customers are led to the head of the queue. If you are the first customer who starts a queue, be prepared to wait for a long time because new customers join in front of you. And the waiter will tell you to leave the queue because you waited too long (timeout) or a new customer is joining the queue but the queue is too long (stack full)!

Implementations

class Jobqueue {
constructor({ maxRunningJobs = 1, maxWaitingJobs = 1, waitingTimeout }) {
this.maxRunningJobs = maxRunningJobs;
this.maxWaitingJobs = maxWaitingJobs;
this.waitingTimeout = waitingTimeout;
this.waitingList = [];
this.runningJobs = 0;
this.onRunningJobDone = this.onRunningJobDone.bind(this);
}
// public
// - resolve with a callback function that must be called when a job is done
// - reject with a timeout error
// - reject with a full stack error
wait() {
if (this.isFull()) {
this.rejectOldestJob();
}
if (this.maxRunningJobs > this.runningJobs) {
this.runningJobs++;
return Promise.resolve(this.onRunningJobDone);
}
return new Promise((resolve, reject) => {
let timer;
// https://electronics.howstuffworks.com/everyday-tech/restaurant-pager.htm
const pager = {
resolve: () => {
clearTimeout(timer);
this.runningJobs++;
resolve(this.onRunningJobDone);
},
reject: reason => {
clearTimeout(timer);
reject(reason);
}
};
this.waitingList.push(pager);
if (Number.isFinite(this.waitingTimeout)) {
timer = setTimeout(() => {
// `index` should be `0` or `-1` as long as the timer works in order.
const index = this.waitingList.indexOf(pager);
if (index >= 0) {
this.waitingList.splice(index, 1);
pager.reject(new Error("timeout in stack"));
}
}, this.waitingTimeout);
timer.unref();
}
});
}
// public
async do(job) {
const done = await this.wait();
return job().finally(done);
}
onRunningJobDone() {
this.runningJobs--;
this.runNewestJob();
}
runNewestJob() {
if (this.waitingList.length === 0) {
return;
}
const newest = this.waitingList.pop();
newest.resolve();
}
rejectOldestJob() {
if (this.waitingList.length === 0) {
return;
}
const oldest = this.waitingList.shift();
oldest.reject(new Error("full stack"));
}
isFull() {
return this.waitingList.length >= this.maxWaitingJobs;
}
}
module.exports = Jobqueue;
const Jobqueue = require(".");
const jobqueue = new Jobqueue({
maxRunningJobs: 10,
maxWaitingJobs: 100,
waitingTimeout: 1000
});
function timeout(ms) {
return new Promise(resolve => {
setTimeout(resolve, ms);
});
}
const start = Date.now();
for (let i = 0; i < 100; i++) {
const delay = 100 + Math.random() * 100;
jobqueue
.do(() => timeout(delay))
.then(
() => {
const time = Date.now() - start;
console.log(time, i, delay, {
runningJobs: jobqueue.runningJobs,
waitingJobs: jobqueue.waitingList.length
});
},
err => {
const time = Date.now() - start;
console.log(time, i, delay, err.message, {
runningJobs: jobqueue.runningJobs,
waitingJobs: jobqueue.waitingList.length
});
}
);
}
@shuhei

This comment has been minimized.

Copy link
Owner Author

@shuhei shuhei commented Jun 26, 2019

It's important to pick a big maxRunningJobs to make sure that a queue is normally not formed at all. A queue should be reserved for an emergency. In other words, jobqueue is a nice version of limiting concurrent operations.

@shuhei

This comment has been minimized.

Copy link
Owner Author

@shuhei shuhei commented Jun 26, 2019

Note that timeout is applied only for the time when a job is waiting to be executed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.