Skip to content

Instantly share code, notes, and snippets.

@leonkaihao
Forked from iwasaki-kenta/channel.ts
Last active August 6, 2020 09:34
Show Gist options
  • Save leonkaihao/bc1f27043b55829f47520dc21756c02e to your computer and use it in GitHub Desktop.
Save leonkaihao/bc1f27043b55829f47520dc21756c02e to your computer and use it in GitHub Desktop.
Go-like channels in TypeScript.
import { Subject } from 'rxjs';
export class Deferred<T> {
promise: Promise<T>;
resolve!: (value?: T | PromiseLike<T>) => void;
reject!: (reason?: any) => void;
constructor() {
this.promise = new Promise<T>((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
}
}
export enum CHANNEL_STATUS {
INIT = 0,
CLOSED = 1
}
// a Channel instance can be used once from new to close
// after closing, channel instance cannot be reused.
export class Channel<T> {
private status = CHANNEL_STATUS.INIT;
private info: any = '';
private closeEvent$: Subject<any> = new Subject();
public constructor(
public readonly capacity = 1,
private readonly values: T[] = [],
private readonly sends: { value: T; signal: Deferred<void> }[] = [],
private readonly recvs: Deferred<T>[] = []
) {}
// send put a value into queue, await can block send process when queue is full.
public async send(value: T): Promise<void> {
if (this.status === CHANNEL_STATUS.CLOSED) {
throw Error(`channel is closed`);
}
// notify a waiting receiver if it exists
// recvs's priority is higher than values because only empty values let a receiver to wait.
if (this.recvs.length > 0) {
this.recvs.shift().resolve(value);
return;
}
// no waiting receiver, put data in values if it has vacancy.
if (this.values.length < this.capacity) {
this.values.push(value);
return;
}
// capacity is full, wait for notifying from receiver
const signal = new Deferred<void>();
this.sends.push({ value, signal });
await signal.promise;
}
// recv get a value from queue, await can block recv process when queue is empty.
public async recv(): Promise<T> {
// find items in values first
if (this.values.length > 0) {
const val = this.values.shift();
if (val !== undefined) {
return val;
}
}
// if values is empty, find a waiting sender
if (this.sends.length > 0) {
const send = this.sends.shift();
send.signal.resolve();
const val = send.value;
if (val !== undefined) {
return val;
}
}
// only a empty & closed channel generate a receiving error
if (this.status === CHANNEL_STATUS.CLOSED) {
throw Error(`channel is closed`);
}
// no waiting sender, wait for notifying from sender
const signal = new Deferred<T>();
this.recvs.push(signal);
return signal.promise;
}
// after close, the channel cannot be used again.
// what we can do is receiving the data left in the channel.
public close(info?: any): void {
if (this.status === CHANNEL_STATUS.CLOSED) {
return;
}
this.info = info;
this.status = CHANNEL_STATUS.CLOSED;
this.closeEvent$.next(info);
this.clear();
}
// query channel if it is closed.
// This is useful when user haven't subscribe to its event
public isClosed(): boolean {
return this.status === CHANNEL_STATUS.CLOSED;
}
// onClose support subscribing closing event
public onClose(close: Function): void {
this.closeEvent$.subscribe(info => {
close(info);
});
}
// getInfo get message coming from closing event.
public getInfo(): any {
return this.info;
}
// clear will notify and remove all the waitlist
private clear(): void {
// don't remove sends because sender has data
for (const sendItem of this.sends) {
sendItem.signal.reject(Error('sender closed'));
}
let recvItem = this.recvs.shift();
while (recvItem) {
recvItem.reject(Error('receiver closed'));
recvItem = this.recvs.shift();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment