-
-
Save leonkaihao/bc1f27043b55829f47520dc21756c02e to your computer and use it in GitHub Desktop.
Go-like channels in TypeScript.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Subject } from 'rxjs'; | |
export class Deferred<T> { | |
promise: Promise<T>; | |
resolve!: (value?: T | PromiseLike<T>) => void; | |
reject!: (reason?: any) => void; | |
constructor() { | |
this.promise = new Promise<T>((resolve, reject) => { | |
this.resolve = resolve; | |
this.reject = reject; | |
}); | |
} | |
} | |
export enum CHANNEL_STATUS { | |
INIT = 0, | |
CLOSED = 1 | |
} | |
// a Channel instance can be used once from new to close | |
// after closing, channel instance cannot be reused. | |
export class Channel<T> { | |
private status = CHANNEL_STATUS.INIT; | |
private info: any = ''; | |
private closeEvent$: Subject<any> = new Subject(); | |
public constructor( | |
public readonly capacity = 1, | |
private readonly values: T[] = [], | |
private readonly sends: { value: T; signal: Deferred<void> }[] = [], | |
private readonly recvs: Deferred<T>[] = [] | |
) {} | |
// send put a value into queue, await can block send process when queue is full. | |
public async send(value: T): Promise<void> { | |
if (this.status === CHANNEL_STATUS.CLOSED) { | |
throw Error(`channel is closed`); | |
} | |
// notify a waiting receiver if it exists | |
// recvs's priority is higher than values because only empty values let a receiver to wait. | |
if (this.recvs.length > 0) { | |
this.recvs.shift().resolve(value); | |
return; | |
} | |
// no waiting receiver, put data in values if it has vacancy. | |
if (this.values.length < this.capacity) { | |
this.values.push(value); | |
return; | |
} | |
// capacity is full, wait for notifying from receiver | |
const signal = new Deferred<void>(); | |
this.sends.push({ value, signal }); | |
await signal.promise; | |
} | |
// recv get a value from queue, await can block recv process when queue is empty. | |
public async recv(): Promise<T> { | |
// find items in values first | |
if (this.values.length > 0) { | |
const val = this.values.shift(); | |
if (val !== undefined) { | |
return val; | |
} | |
} | |
// if values is empty, find a waiting sender | |
if (this.sends.length > 0) { | |
const send = this.sends.shift(); | |
send.signal.resolve(); | |
const val = send.value; | |
if (val !== undefined) { | |
return val; | |
} | |
} | |
// only a empty & closed channel generate a receiving error | |
if (this.status === CHANNEL_STATUS.CLOSED) { | |
throw Error(`channel is closed`); | |
} | |
// no waiting sender, wait for notifying from sender | |
const signal = new Deferred<T>(); | |
this.recvs.push(signal); | |
return signal.promise; | |
} | |
// after close, the channel cannot be used again. | |
// what we can do is receiving the data left in the channel. | |
public close(info?: any): void { | |
if (this.status === CHANNEL_STATUS.CLOSED) { | |
return; | |
} | |
this.info = info; | |
this.status = CHANNEL_STATUS.CLOSED; | |
this.closeEvent$.next(info); | |
this.clear(); | |
} | |
// query channel if it is closed. | |
// This is useful when user haven't subscribe to its event | |
public isClosed(): boolean { | |
return this.status === CHANNEL_STATUS.CLOSED; | |
} | |
// onClose support subscribing closing event | |
public onClose(close: Function): void { | |
this.closeEvent$.subscribe(info => { | |
close(info); | |
}); | |
} | |
// getInfo get message coming from closing event. | |
public getInfo(): any { | |
return this.info; | |
} | |
// clear will notify and remove all the waitlist | |
private clear(): void { | |
// don't remove sends because sender has data | |
for (const sendItem of this.sends) { | |
sendItem.signal.reject(Error('sender closed')); | |
} | |
let recvItem = this.recvs.shift(); | |
while (recvItem) { | |
recvItem.reject(Error('receiver closed')); | |
recvItem = this.recvs.shift(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment