Created
August 18, 2017 04:48
-
-
Save StephenCleary/ba50b2da419c03b9cba1d20cb4654d5e to your computer and use it in GitHub Desktop.
AsyncEx... for TypeScript
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Future<T> { | |
private resolver: (value: T) => void; // readonly | |
private rejecter: (error: Error) => void; // readonly | |
private _isCompleted: boolean = false; | |
constructor() { | |
this.promise = new Promise((resolve, reject) => { | |
this.resolver = resolve; | |
this.rejecter = reject; | |
}); | |
} | |
public readonly promise: Promise<T>; | |
public get isCompleted(): boolean { | |
return this._isCompleted; | |
} | |
public resolve(value: T): void { | |
if (this._isCompleted) { | |
return; | |
} | |
this._isCompleted = true; | |
this.resolver(value); | |
} | |
public reject(error: Error): void { | |
if (this._isCompleted) { | |
return; | |
} | |
this._isCompleted = true; | |
this.rejecter(error); | |
} | |
} | |
class AsyncWaitQueue<T> { | |
private queue: Future<T>[] = []; | |
public get length(): number { | |
return this.queue.length; | |
} | |
public enqueue(): Promise<T> { | |
let future = new Future<T>(); | |
this.queue.push(future); | |
return future.promise; | |
} | |
public dequeue(value: T): void { | |
let future = this.queue.shift(); | |
future.resolve(value); | |
} | |
public dequeueAll(value: T): void { | |
this.queue.forEach(x => x.resolve(value)); | |
this.queue = []; | |
} | |
} | |
export interface IDisposable { | |
dispose(); | |
} | |
class Disposable implements IDisposable { | |
constructor(private readonly _dispose: () => void) { } | |
public dispose() { | |
this._dispose(); | |
} | |
} | |
export class AsyncLock { | |
private readonly queue: AsyncWaitQueue<IDisposable> = new AsyncWaitQueue<IDisposable>(); | |
private readonly key: IDisposable = new Disposable(this.unlock); | |
private _isLocked: boolean = false; | |
get isLocked(): boolean { | |
return this._isLocked; | |
} | |
public lockAsync(): Promise<IDisposable> { | |
if (!this._isLocked) { | |
this._isLocked = true; | |
return Promise.resolve(this.key); | |
} | |
return this.queue.enqueue(); | |
} | |
public unlock(): void { | |
if (this.queue.length === 0) { | |
this._isLocked = false; | |
return; | |
} | |
this.queue.dequeue(this.key); | |
} | |
} | |
export class AsyncSemaphore { | |
private readonly queue: AsyncWaitQueue<void> = new AsyncWaitQueue<void>(); | |
private readonly key: IDisposable = new Disposable(() => this.release()); | |
constructor(private _count: number) { | |
} | |
public get count(): number { | |
return this._count; | |
} | |
public waitAsync(): Promise<void> { | |
if (this._count !== 0) { | |
--this._count; | |
return Promise.resolve(null); | |
} | |
return this.queue.enqueue(); | |
} | |
public release(count: number = 1) { | |
if (count === 0) { | |
return; | |
} | |
while (count !== 0 && this.queue.length !== 0) { | |
this.queue.dequeue(null); | |
--count; | |
} | |
this._count += count; | |
} | |
public async lockAsync(): Promise<IDisposable> { | |
await this.waitAsync(); | |
return this.key; | |
} | |
public unlock(): void { | |
this.release(); | |
} | |
} | |
export class AsyncManualResetEvent { | |
private future: Future<void>; | |
constructor(isSet: boolean = false) { | |
this.future = new Future<void>(); | |
if (isSet) { | |
this.future.resolve(null); | |
} | |
} | |
public get isSet(): boolean { | |
return this.future.isCompleted; | |
} | |
public waitAsync(): Promise<void> { | |
return this.future.promise; | |
} | |
public set(): void { | |
this.future.resolve(null); | |
} | |
public reset(): void { | |
if (this.future.isCompleted) { | |
this.future = new Future<void>(); | |
} | |
} | |
} | |
export class AsyncConditionVariable { | |
private readonly queue: AsyncWaitQueue<void>; | |
constructor(private readonly asyncLock: AsyncLock) { } | |
public notify(): void { | |
if (this.queue.length !== 0) { | |
this.queue.dequeue(null); | |
} | |
} | |
public notifyAll(): void { | |
this.queue.dequeueAll(null); | |
} | |
public waitAsync(): Promise<void> { | |
let future = this.waitAndRetakeLockAsync(this.queue.enqueue()); | |
this.asyncLock.unlock(); | |
return future; | |
} | |
private async waitAndRetakeLockAsync(signal: Promise<void>): Promise<void> { | |
await signal; | |
await this.asyncLock.lockAsync(); | |
} | |
} | |
export class AsyncMonitor { | |
private readonly lock: AsyncLock; | |
private readonly conditionVariable: AsyncConditionVariable; | |
constructor() { | |
this.lock = new AsyncLock(); | |
this.conditionVariable = new AsyncConditionVariable(this.lock); | |
} | |
public enterAsync(): Promise<IDisposable> { | |
return this.lock.lockAsync(); | |
} | |
public leave(): void { | |
this.lock.unlock(); | |
} | |
public waitAsync(): Promise<void> { | |
return this.conditionVariable.waitAsync(); | |
} | |
public pulse(): void { | |
this.conditionVariable.notify(); | |
} | |
public pulseAll(): void { | |
this.conditionVariable.notifyAll(); | |
} | |
} | |
export class AsyncProducerConsumerQueue<T> { | |
private readonly maxItems: number; | |
private readonly items: T[]; | |
private readonly lock: AsyncLock; | |
private readonly completedOrNotEmpty: AsyncConditionVariable; | |
private readonly completedOrNotFull: AsyncConditionVariable; | |
private _isCompleted: boolean = false; | |
constructor(maxItems: number = 0, items: T[] = []) { | |
this.maxItems = maxItems; | |
this.items = items.slice(); | |
this.lock = new AsyncLock(); | |
this.completedOrNotEmpty = new AsyncConditionVariable(this.lock); | |
this.completedOrNotFull = new AsyncConditionVariable(this.lock); | |
} | |
public get isCompleted(): boolean { | |
return this._isCompleted; | |
} | |
public get isEmpty(): boolean { | |
return this.items.length === 0; | |
} | |
public get isFull(): boolean { | |
return this.items.length === this.maxItems; | |
} | |
public async completeAddingAsync(): Promise<void> { | |
let key = await this.lock.lockAsync(); | |
try { | |
this._isCompleted = true; | |
this.completedOrNotEmpty.notifyAll(); | |
this.completedOrNotFull.notifyAll(); | |
} finally { | |
key.dispose(); | |
} | |
} | |
public async enqueueAsync(item: T): Promise<void> { | |
let key = await this.lock.lockAsync(); | |
try { | |
// Wait for the queue to be not full. | |
while (this.isFull && !this.isCompleted) { | |
await this.completedOrNotFull.waitAsync(); | |
} | |
// If the queue has been marked complete, then abort. | |
if (this.isCompleted) | |
throw new Error("Enqueue failed; the producer/consumer queue has completed adding."); | |
this.items.push(item); | |
this.completedOrNotEmpty.notify(); | |
} finally { | |
key.dispose(); | |
} | |
} | |
public async outputAvailableAsync(): Promise<boolean> { | |
let key = await this.lock.lockAsync(); | |
try { | |
while (this.isEmpty && !this.isCompleted) { | |
await this.completedOrNotEmpty.waitAsync(); | |
} | |
return !this.isEmpty; | |
} finally { | |
key.dispose(); | |
} | |
} | |
public async dequeueAsync(): Promise<T> { | |
let key = await this.lock.lockAsync(); | |
try { | |
while (this.isEmpty && !this.isCompleted) { | |
await this.completedOrNotEmpty.waitAsync(); | |
} | |
if (this.isCompleted && this.isEmpty) { | |
throw new Error("Dequeue failed; the producer/consumer queue has completed adding and is empty."); | |
} | |
var item = this.items.shift(); | |
this.completedOrNotFull.notify(); | |
return item; | |
} finally { | |
key.dispose(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment