Last active
February 5, 2024 15:58
-
-
Save shimondoodkin/285e3a55a1181ee59c583b780b8d2296 to your computer and use it in GitHub Desktop.
this queue lets run not related tasks in parallel,
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type Task = () => Promise<void> | void; | |
export class TTLCache { | |
cache: Map<string, number> = new Map(); | |
set(key: string, ttl: number): void { | |
const expiresAt = Date.now() + ttl * 1000; // Convert TTL to milliseconds | |
this.cache.set(key, expiresAt); | |
} | |
isExpired(key: string): boolean { | |
if (!this.cache.has(key)) return true; | |
const expiresAt = this.cache.get(key); | |
return Date.now() > expiresAt; | |
} | |
cleanup(): void { | |
const now = Date.now(); | |
for (let [key, expiresAt] of this.cache.entries()) { | |
if (now > expiresAt) { | |
this.cache.delete(key); | |
} | |
} | |
} | |
} | |
export class SomeService { | |
queueTtlCache: TTLCache; | |
constructor( | |
) { | |
this.queueTtlCache = new TTLCache(); | |
} | |
private queueTasks: Array<{ task: () => Promise<void>; blockingValues: string[] }> = []; | |
private queueIsRunning: boolean = false; | |
addTaskToQueue(task: () => Promise<void>, blockingValues: string[]): void { | |
// Add task to queue | |
this.queueTasks.push({ task, blockingValues }); | |
console.info('zoho service: added to queue, ítems in queue ' + this.queueTasks.length + ', items in cache ' + this.queueTtlCache.cache.size) | |
// Start processing if not already running | |
if (!this.queueIsRunning) { | |
this.processQueue(); | |
} | |
} | |
private async processQueue(): Promise<void> { | |
this.queueIsRunning = true; | |
while (this.queueIsRunning) { | |
// Cleanup expired values from the cache | |
this.queueTtlCache.cleanup(); | |
// Execute tasks without blocked values or wait if all are blocked | |
if (this.queueTasks.length > 0) { | |
let donetasks = [] | |
let queueitems = this.queueTasks.slice(0) | |
for (let queueItem of queueitems) { | |
let { task, blockingValues } = queueItem; | |
if (blockingValues.every(value => this.queueTtlCache.isExpired(value))) { | |
donetasks.push((async () => { | |
// Add blocking values to cache with TTL, avoiding reset if not expired | |
blockingValues.forEach(value => { | |
if (this.queueTtlCache.isExpired(value)) { | |
this.queueTtlCache.set(value, 60); // Set with a TTL of 60 seconds | |
} | |
}); | |
// execute the task | |
try { | |
await task() | |
} catch (e) { | |
console.log(e?.stack || e) | |
} | |
finally { | |
this.queueTasks.splice(this.queueTasks.indexOf(queueItem), 1) | |
} | |
})()); | |
} | |
} | |
// validate all taks finished | |
await Promise.all(donetasks) | |
if (donetasks.length > 0) | |
console.info('zoho service: processed queue items, ítems in queue ' + this.queueTasks.length + ', items in cache ' + this.queueTtlCache.cache.size) | |
if (this.queueTasks.length > 0) { | |
// Wait before trying again if there are tasks still delayed by blocking values | |
await new Promise(resolve => setTimeout(resolve, 10000)); // 10-second wait | |
} else { | |
this.queueIsRunning = false; // No more tasks to process | |
} | |
} else { | |
// No tasks in queue, stop running | |
this.queueIsRunning = false; | |
} | |
} | |
} | |
addOrder( | |
addOrderDto: AddOrderDto | |
) { | |
this.addTaskToQueue(async () => { | |
try { | |
// do something with addOrderDto | |
} catch (error) { | |
console.error( | |
error?.stack ||error | |
) | |
} | |
}, (() => { | |
var a = []; | |
if (addOrderDto.Order_ID) a.push('Order_ID:' + addOrderDto.Order_ID) | |
if (addOrderDto.Email) a.push('Email:' + addOrderDto.Email) | |
return a; | |
})()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment