Skip to content

Instantly share code, notes, and snippets.

@StephenCleary
Created August 18, 2017 04:48
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save StephenCleary/ba50b2da419c03b9cba1d20cb4654d5e to your computer and use it in GitHub Desktop.
Save StephenCleary/ba50b2da419c03b9cba1d20cb4654d5e to your computer and use it in GitHub Desktop.
AsyncEx... for TypeScript
class Future<T> {
private resolver: (value: T) => void; // readonly
private rejecter: (error: Error) => void; // readonly
private _isCompleted: boolean = false;
constructor() {
this.promise = new Promise((resolve, reject) => {
this.resolver = resolve;
this.rejecter = reject;
});
}
public readonly promise: Promise<T>;
public get isCompleted(): boolean {
return this._isCompleted;
}
public resolve(value: T): void {
if (this._isCompleted) {
return;
}
this._isCompleted = true;
this.resolver(value);
}
public reject(error: Error): void {
if (this._isCompleted) {
return;
}
this._isCompleted = true;
this.rejecter(error);
}
}
class AsyncWaitQueue<T> {
private queue: Future<T>[] = [];
public get length(): number {
return this.queue.length;
}
public enqueue(): Promise<T> {
let future = new Future<T>();
this.queue.push(future);
return future.promise;
}
public dequeue(value: T): void {
let future = this.queue.shift();
future.resolve(value);
}
public dequeueAll(value: T): void {
this.queue.forEach(x => x.resolve(value));
this.queue = [];
}
}
export interface IDisposable {
dispose();
}
class Disposable implements IDisposable {
constructor(private readonly _dispose: () => void) { }
public dispose() {
this._dispose();
}
}
export class AsyncLock {
private readonly queue: AsyncWaitQueue<IDisposable> = new AsyncWaitQueue<IDisposable>();
private readonly key: IDisposable = new Disposable(this.unlock);
private _isLocked: boolean = false;
get isLocked(): boolean {
return this._isLocked;
}
public lockAsync(): Promise<IDisposable> {
if (!this._isLocked) {
this._isLocked = true;
return Promise.resolve(this.key);
}
return this.queue.enqueue();
}
public unlock(): void {
if (this.queue.length === 0) {
this._isLocked = false;
return;
}
this.queue.dequeue(this.key);
}
}
export class AsyncSemaphore {
private readonly queue: AsyncWaitQueue<void> = new AsyncWaitQueue<void>();
private readonly key: IDisposable = new Disposable(() => this.release());
constructor(private _count: number) {
}
public get count(): number {
return this._count;
}
public waitAsync(): Promise<void> {
if (this._count !== 0) {
--this._count;
return Promise.resolve(null);
}
return this.queue.enqueue();
}
public release(count: number = 1) {
if (count === 0) {
return;
}
while (count !== 0 && this.queue.length !== 0) {
this.queue.dequeue(null);
--count;
}
this._count += count;
}
public async lockAsync(): Promise<IDisposable> {
await this.waitAsync();
return this.key;
}
public unlock(): void {
this.release();
}
}
export class AsyncManualResetEvent {
private future: Future<void>;
constructor(isSet: boolean = false) {
this.future = new Future<void>();
if (isSet) {
this.future.resolve(null);
}
}
public get isSet(): boolean {
return this.future.isCompleted;
}
public waitAsync(): Promise<void> {
return this.future.promise;
}
public set(): void {
this.future.resolve(null);
}
public reset(): void {
if (this.future.isCompleted) {
this.future = new Future<void>();
}
}
}
export class AsyncConditionVariable {
private readonly queue: AsyncWaitQueue<void>;
constructor(private readonly asyncLock: AsyncLock) { }
public notify(): void {
if (this.queue.length !== 0) {
this.queue.dequeue(null);
}
}
public notifyAll(): void {
this.queue.dequeueAll(null);
}
public waitAsync(): Promise<void> {
let future = this.waitAndRetakeLockAsync(this.queue.enqueue());
this.asyncLock.unlock();
return future;
}
private async waitAndRetakeLockAsync(signal: Promise<void>): Promise<void> {
await signal;
await this.asyncLock.lockAsync();
}
}
export class AsyncMonitor {
private readonly lock: AsyncLock;
private readonly conditionVariable: AsyncConditionVariable;
constructor() {
this.lock = new AsyncLock();
this.conditionVariable = new AsyncConditionVariable(this.lock);
}
public enterAsync(): Promise<IDisposable> {
return this.lock.lockAsync();
}
public leave(): void {
this.lock.unlock();
}
public waitAsync(): Promise<void> {
return this.conditionVariable.waitAsync();
}
public pulse(): void {
this.conditionVariable.notify();
}
public pulseAll(): void {
this.conditionVariable.notifyAll();
}
}
export class AsyncProducerConsumerQueue<T> {
private readonly maxItems: number;
private readonly items: T[];
private readonly lock: AsyncLock;
private readonly completedOrNotEmpty: AsyncConditionVariable;
private readonly completedOrNotFull: AsyncConditionVariable;
private _isCompleted: boolean = false;
constructor(maxItems: number = 0, items: T[] = []) {
this.maxItems = maxItems;
this.items = items.slice();
this.lock = new AsyncLock();
this.completedOrNotEmpty = new AsyncConditionVariable(this.lock);
this.completedOrNotFull = new AsyncConditionVariable(this.lock);
}
public get isCompleted(): boolean {
return this._isCompleted;
}
public get isEmpty(): boolean {
return this.items.length === 0;
}
public get isFull(): boolean {
return this.items.length === this.maxItems;
}
public async completeAddingAsync(): Promise<void> {
let key = await this.lock.lockAsync();
try {
this._isCompleted = true;
this.completedOrNotEmpty.notifyAll();
this.completedOrNotFull.notifyAll();
} finally {
key.dispose();
}
}
public async enqueueAsync(item: T): Promise<void> {
let key = await this.lock.lockAsync();
try {
// Wait for the queue to be not full.
while (this.isFull && !this.isCompleted) {
await this.completedOrNotFull.waitAsync();
}
// If the queue has been marked complete, then abort.
if (this.isCompleted)
throw new Error("Enqueue failed; the producer/consumer queue has completed adding.");
this.items.push(item);
this.completedOrNotEmpty.notify();
} finally {
key.dispose();
}
}
public async outputAvailableAsync(): Promise<boolean> {
let key = await this.lock.lockAsync();
try {
while (this.isEmpty && !this.isCompleted) {
await this.completedOrNotEmpty.waitAsync();
}
return !this.isEmpty;
} finally {
key.dispose();
}
}
public async dequeueAsync(): Promise<T> {
let key = await this.lock.lockAsync();
try {
while (this.isEmpty && !this.isCompleted) {
await this.completedOrNotEmpty.waitAsync();
}
if (this.isCompleted && this.isEmpty) {
throw new Error("Dequeue failed; the producer/consumer queue has completed adding and is empty.");
}
var item = this.items.shift();
this.completedOrNotFull.notify();
return item;
} finally {
key.dispose();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment