Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
In memory async queue in typescript
type Callback<T> = () => Promise<T>
export type AsyncQueue<T = void> = {
push: (task: Callback<T>) => Promise<T>
flush: () => Promise<void>
size: number
}
/**
* Ensures that each callback pushed onto the queue is executed in series.
* Such a quetie 😻
* @param opts.dedupeConcurrent If dedupeConcurrent is `true` it ensures that if multiple
* tasks are pushed onto the queue while there is an active task, only the
* last one will be executed, once the active task has completed.
* e.g. in the below example, only 0 and 3 will be executed.
* ```
* const queue = createAsyncQueue({ dedupeConcurrent: true })
* queue.push(async () => console.log(0)) // returns 0
* queue.push(async () => console.log(1)) // returns 3
* queue.push(async () => console.log(2)) // returns 3
* queue.push(async () => console.log(3)) // returns 3
* ```
* */
export function createAsyncQueue<T = void>(opts = { dedupeConcurrent: false }): AsyncQueue<T> {
const { dedupeConcurrent } = opts
let queue: Callback<T>[] = []
let running: Promise<void> | undefined
let nextPromise = new DeferredPromise<T>()
const push = (task: Callback<T>) => {
let taskPromise = new DeferredPromise<T>()
if (dedupeConcurrent) {
queue = []
if (nextPromise.started) nextPromise = new DeferredPromise<T>()
taskPromise = nextPromise
}
queue.push(() => {
taskPromise.started = true
task().then(taskPromise.resolve).catch(taskPromise.reject)
return taskPromise.promise
})
if (!running) running = start()
return taskPromise.promise
}
const start = async () => {
while (queue.length) {
const task = queue.shift()!
await task().catch(() => {})
}
running = undefined
}
return {
push,
flush: () => running || Promise.resolve(),
get size() {
return queue.length
},
}
}
export const createAsyncQueues = <T = void>(opts = { dedupeConcurrent: false }) => {
const queues: { [queueId: string]: AsyncQueue<T> } = {}
const push = (queueId: string, task: Callback<T>) => {
if (!queues[queueId]) queues[queueId] = createAsyncQueue<T>(opts)
return queues[queueId].push(task)
}
const flush = (queueId: string) => {
if (!queues[queueId]) queues[queueId] = createAsyncQueue<T>(opts)
return queues[queueId].flush()
}
return { push, flush }
}
class DeferredPromise<T = void, E = any> {
started = false
resolve: (x: T | PromiseLike<T>) => void = () => {}
reject: (x: E) => void = () => {}
promise: Promise<T>
constructor() {
this.promise = new Promise<T>((res, rej) => {
this.resolve = res
this.reject = rej
})
}
}
const queue = createAsyncQueue()
const task1 = async () => {
await fetchItem()
}
queue.push(task1)
const task2 = async () => {
await fetchItem()
}
queue.push(task2)
// task1 will be guaranteed to be executed before task2
@pbredenberg
Copy link

pbredenberg commented Feb 26, 2022

Thanks so much for this! It works great.

@mfbx9da4
Copy link
Author

mfbx9da4 commented Feb 27, 2022

My pleasure! Yeah I use this primitive in so many places, may well put it on npm when I get a chance. What do you think of the dedupeConcurrent option? Is it clear what it does? Could the naming be improved? Do you use it? What is your use case?

@pbredenberg
Copy link

pbredenberg commented Feb 27, 2022

I think having a package I could pull in when needed would be great!

It's an infrequent use case for me these days, but I find myself with this quandary whenever I need to do complex scripting: either cobble together a massive library of bash stuff that's been rotting in various projects, or try to do it in Node so I can leverage parts of a related project and write it in TypeScript.

So - this weekend I needed to put together an automated process that would airdrop whitelist tokens on the Solana blockchain for an NFT project I've been engaged to help with. I wrote a Discord bot to collect wallet addresses with sqlite as the data persistence layer. I wanted to use that data to drive the airdrop and notify each user that we sent them their token.

The airdrop process (transferring tokens from one wallet to another) needs to be idempotent, long-running, and the script needs access to the sqlite database. So I wrote it in TypeScript so I could re-use my TypeORM services to retrieve the data. To handle the large of amount of async processes I would need a queue because Solana will rate-limit you rather aggressively. Rather than writing my own queue I found yours rather quickly and it worked perfectly!

As to the dedupe option, all the airdrops needed to go out rather than only the latest, so I didn't need it this time. Does that answer your question? The name dedupe at first made me think it was to prevent duplicate tasks from being executed, but reading your documentation made it clear what the intention was.

As an aside, thank you for including the option as an object rather than an ever-growing list of arguments! Also, thank you for the pun! 😆

@mfbx9da4
Copy link
Author

mfbx9da4 commented Feb 27, 2022

Makes sense! You’re welcome!

@L2Eme
Copy link

L2Eme commented May 20, 2022

Thanks, this module is brief and beautiful.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment