Skip to content

Instantly share code, notes, and snippets.

@vbfox
Created June 4, 2020 14:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vbfox/32cc61b2d0f354e582d579e9336e83ce to your computer and use it in GitHub Desktop.
Save vbfox/32cc61b2d0f354e582d579e9336e83ce to your computer and use it in GitHub Desktop.
A queue of promises keyed by a value
import { Queue } from 'src/utils/queue';
import { assertDefined } from 'src/utils/asserts';
interface QueueItem {
readonly generator: () => PromiseLike<any>;
readonly resolve: (value: any) => void;
readonly reject: (error: any) => void;
}
interface PerKeyQueue<TKey> {
readonly key: TKey;
current: QueueItem | undefined;
readonly remaining: Queue<QueueItem>;
}
/**
* For each key a queue of promise is maintained, when one finishes the next start executing until no more promise
* remain for the key.
*
* Promises for different keys don't wait each other and run concurrently.
*/
export class KeyedPromiseQueue<TKey> {
private queues = new Map<TKey, PerKeyQueue<TKey>>();
private getOrCreateForKey(key: TKey): PerKeyQueue<TKey> {
const existing = this.queues.get(key);
if (existing) {
return existing;
}
const newValue: PerKeyQueue<TKey> = { key, current: undefined, remaining: new Queue<QueueItem>() };
this.queues.set(key, newValue);
return newValue;
}
private runAndSetCurrent(key: TKey, queue: PerKeyQueue<TKey>, queueItem: QueueItem | undefined) {
// eslint-disable-next-line no-param-reassign
queue.current = queueItem;
if (queueItem !== undefined) {
queueItem.generator().then(
v => this.onPromiseResolved(key, v),
e => this.onPromiseRejected(key, e));
}
}
private onPromiseResolved(key: TKey, value: any) {
const currentQueue = this.queues.get(key);
assertDefined(currentQueue, 'Queue should exist');
assertDefined(currentQueue.current, 'Queue should have a current promise');
const resolve = currentQueue.current.resolve;
this.runAndSetCurrent(key, currentQueue, currentQueue.remaining.dequeue());
resolve(value);
}
private onPromiseRejected(key: TKey, error: any) {
const currentQueue = this.queues.get(key);
assertDefined(currentQueue, 'Queue should exist');
assertDefined(currentQueue.current, 'Queue should have a current promise');
const reject = currentQueue.current.reject;
this.runAndSetCurrent(key, currentQueue, currentQueue.remaining.dequeue());
reject(error);
}
enqueue<T>(key: TKey, promiseGenerator: () => PromiseLike<T>): PromiseLike<T> {
return new Promise<T>((resolve, reject) => {
const currentQueue = this.getOrCreateForKey(key);
const queueItem: QueueItem = { generator: promiseGenerator, resolve, reject };
// No current ? we're now the current.
if (currentQueue.current === undefined) {
this.runAndSetCurrent(key, currentQueue, queueItem);
} else {
// Otherwise queue for later
currentQueue.remaining.enqueue(queueItem);
}
});
}
}
/**
* A mutable queue, javascript arrays already support being used as a queue but their interface isn't as intuitive.
*/
export class Queue<T> {
private items: T[];
constructor(items?: T[]) {
this.items = items ? [...items] : [];
}
enqueue(item: T) {
this.items.push(item);
}
dequeue(): T | undefined {
return this.items.shift();
}
get length(): number {
return this.items.length;
}
get isEmpty(): boolean {
return this.items.length === 0;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment