Skip to content

Instantly share code, notes, and snippets.

@mhrstmnn
Created July 19, 2023 01:51
Show Gist options
  • Save mhrstmnn/e2519f68e42697fe0454313b58f54f0e to your computer and use it in GitHub Desktop.
Save mhrstmnn/e2519f68e42697fe0454313b58f54f0e to your computer and use it in GitHub Desktop.
Task queue developed to run with Deno
import { TaskQueue } from './task_queue.ts'
const queue = new TaskQueue({
rateLimiter: {
interval: 'second',
tasksPerInterval: 2,
},
})
// Example with await
for (let index = 1; index <= 10; index++) {
const response = await queue.enqueueAsync(() =>
fetch(`https://www.google.com/search?q=Search+${index}`)
)
console.log(response.url, response.status)
}
// Example without await
for (let index = 1; index <= 10; index++) {
queue.enqueue(() => fetch(`https://www.google.com/search?q=Search+${index}`))
}
interface QueueInterface<Type> {
enqueue(item: Type): void
dequeue(): Type | undefined
size(): number
}
export class Queue<Type> implements QueueInterface<Type> {
private capacity: number
private queue: Type[]
constructor(capacity?: number) {
if (capacity) {
this.capacity = capacity
} else {
this.capacity = Infinity
}
this.queue = []
}
enqueue(item: Type): void {
if (this.size() >= this.capacity) {
throw Error(
'The queue has reached its maximum capacity, you cannot add more items'
)
}
this.queue.push(item)
}
dequeue(): Type | undefined {
return this.queue.shift()
}
size(): number {
return this.queue.length
}
}
import { Queue } from './queue.ts'
type TaskFunction = () => unknown | Promise<unknown>
export class TaskQueue extends Queue<TaskFunction> {
private interval: number
private isExecuting: boolean
constructor(options?: {
capacity?: number
rateLimiter?: {
interval: 'second' | 'minute' | 'hour' | 'day' | number
tasksPerInterval?: number
}
}) {
super(options?.capacity)
if (options?.rateLimiter) {
if (options.rateLimiter.interval === 'second') {
this.interval = 1000
} else if (options.rateLimiter.interval === 'minute') {
this.interval = 1000 * 60
} else if (options.rateLimiter.interval === 'hour') {
this.interval = 1000 * 60 * 60
} else if (options.rateLimiter.interval === 'day') {
this.interval = 1000 * 60 * 60 * 24
} else {
this.interval = options.rateLimiter.interval
}
if (
options.rateLimiter.tasksPerInterval &&
options.rateLimiter.tasksPerInterval > 0
) {
this.interval /= options.rateLimiter.tasksPerInterval
}
} else {
this.interval = 0
}
this.isExecuting = false
}
enqueueAsync<Type>(task: () => Type): Promise<Type> {
return new Promise((resolve) => this.enqueue(() => resolve(task())))
}
enqueue(task: TaskFunction): void {
super.enqueue(task)
if (!this.isExecuting) {
this.isExecuting = true
this.executeNext()
}
}
private async executeNext(): Promise<void> {
if (this.size() > 0) {
const task = this.dequeue()
if (task) {
await task()
if (this.interval > 0) {
await this.delay(this.interval)
}
this.executeNext()
}
} else {
this.isExecuting = false
}
}
private delay(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment