Skip to content

Instantly share code, notes, and snippets.

@alex-taxiera
Last active April 11, 2021 04:52
Show Gist options
  • Save alex-taxiera/a0494590c73bd5e980fc797f6755bd2f to your computer and use it in GitHub Desktop.
Save alex-taxiera/a0494590c73bd5e980fc797f6755bd2f to your computer and use it in GitHub Desktop.
Promise Queue classes for async jobs
export type JobFunction<T> = (...args: Array<any>) => Promise<T> // eslint-disable-line @typescript-eslint/no-explicit-any
export type JobResult<T> = {
id: number
priority?: number
data: T
}
export type Job<T> = {
id: number
priority?: number
handlePromise: {
resolve: (value: JobResult<T> | PromiseLike<JobResult<T>>) => void
reject: (reason?: any) => void // eslint-disable-line @typescript-eslint/no-explicit-any
}
job: JobFunction<T>
}
export type JobList<T> = Array<Job<T>>
export type PriorityJobList<T> = Array<JobList<T>>
abstract class AbstractJobQueue<T> {
protected totalJobs: number = 0
protected currentJob?: Job<T>
protected abstract queue: JobList<T> | PriorityJobList<T>
protected abstract getNextJob (): Job<T> | undefined
public abstract get length (): number
public abstract push (job: JobFunction<T>): Promise<JobResult<T>>
protected run (): void {
setTimeout(() => {
if (this.currentJob || this.length <= 0) {
return
}
this.currentJob = this.getNextJob()
if (!this.currentJob) {
return
}
const {
id, job, priority, handlePromise,
} = this.currentJob
job()
.then((data) => handlePromise.resolve({
id, priority, data,
}))
.catch((error: Error) => handlePromise.reject({
id, priority, error,
}))
.finally(() => {
this.currentJob = undefined
this.run()
})
}, 100)
}
}
export class JobQueue<T> extends AbstractJobQueue<T> {
protected queue: JobList<T> = []
public get length (): number {
return this.queue.length
}
public push (job: JobFunction<T>): Promise<JobResult<T>> {
return new Promise<JobResult<T>>((resolve, reject) => {
const id = ++this.totalJobs
this.queue.push({
id, job, handlePromise: { resolve, reject },
})
this.run()
})
}
protected getNextJob (): Job<T> | undefined {
return this.queue.shift()
}
}
export class PriorityJobQueue<T> extends AbstractJobQueue<T> {
protected queue: PriorityJobList<T>
constructor (levels: number) {
super()
this.queue = Array.from({ length: levels }, () => [])
}
public get length (): number {
return this.queue.reduce((ax, dx) => ax + dx.length, 0)
}
protected getNextJob (): Job<T> | undefined {
for (let i = this.queue.length; i > 0;) {
const queue = this.queue[--i]
if (queue.length > 0) {
return queue.shift()
}
}
}
public push (
job: JobFunction<T>,
priority: number = 1,
): Promise<JobResult<T>> {
return new Promise<JobResult<T>>((resolve, reject) => {
if (priority > this.queue.length || priority < 1) {
return reject(RangeError(
`PRIORITY OUT OF RANGE - EXPECTED VALUE FROM 1 TO ${
this.queue.length.toString()
}`,
))
}
const id = ++this.totalJobs
this.queue[priority - 1]
.push({
id, job, priority, handlePromise: { resolve, reject },
})
this.run()
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment