Skip to content

Instantly share code, notes, and snippets.

@danneu
Created March 24, 2023 21:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save danneu/ce4b6d99e170435fa8cdc61c17233ad1 to your computer and use it in GitHub Desktop.
Save danneu/ce4b6d99e170435fa8cdc61c17233ad1 to your computer and use it in GitHub Desktop.
A queue impl where `queue.take()` returns a promise that resolves into a lock when a lock is available.
// USAGE
const queue = new Queue(50)
const middleware = () => async (ctx, next: () => Promise<void>) => {
const lock = await queue.take()
await next().finally(() => {
queue.release(lock)
})
}
// IMPL
class MyLock {}
class Ticket {
resolve: (_: MyLock) => void
constructor(resolve: (_: MyLock) => void) {
this.resolve = resolve
}
}
export default class Queue {
inflightCapacity: number
locks: Set<MyLock>
pending: Array<Ticket>
pendingCapacity: number
constructor(inflightCapacity: number) {
this.inflightCapacity = inflightCapacity
this.locks = new Set()
this.pending = []
this.pendingCapacity = 1000
}
// When you take(), you get a promise that resolves into a lock once a lock
// is available.
async take(): Promise<MyLock> {
if (this.locks.size < this.inflightCapacity) {
// There are locks available
const lock = new MyLock()
this.locks.add(lock)
return lock
} else if (this.pending.length >= this.pendingCapacity) {
// Drop request
throw new Error('OVER_CAPACITY')
} else {
// Pend until a lock is available
const promise = new Promise((resolve: (_: MyLock) => void) => {
const ticket = new Ticket(resolve)
this.pending.push(ticket)
})
return promise
}
}
release(lock: MyLock) {
this.locks.delete(lock)
for (let i = 0; i < this.inflightCapacity - this.locks.size; i++) {
// const ticket = this.pending.pop()
const ticket = this.pending.shift()
if (!ticket) {
return
}
const lock = new MyLock()
this.locks.add(lock)
ticket.resolve(lock)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment