A Go-like buffered channel for JS. Has backpressure
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class BufferedChan<T> { | |
cap: number | |
sendx = new Uint32Array(2) | |
recvx = new Uint32Array(2) | |
buf: { lap: number, val: T | null }[] | |
closed = false | |
// Pending senders/receivers | |
sendq: Array<() => void> = [] | |
recvq: Array<() => void> = [] | |
constructor(cap: number, buffer?: Array<{ lap: number, val: T | null }>) { | |
this.cap = cap | |
if (buffer) { | |
this.cap = buffer.length | |
this.buf = buffer | |
} else { | |
this.buf = new Array(cap).fill(0).map(() => ({ val: null, lap: 0 })); | |
} | |
// Recvr starts on lap 1 | |
this.recvx[1] = 1; | |
} | |
full() { | |
const pos = this.sendx[0] | |
const lap = this.sendx[1] | |
const eLap = this.buf[pos].lap | |
return (lap - eLap) > 0 | |
} | |
empty() { | |
const pos = this.recvx[0] | |
const lap = this.recvx[1] | |
const eLap = this.buf[pos].lap | |
return (lap - eLap) > 0 | |
} | |
trySend(val: T): boolean { | |
var [pos, lap] = this.sendx | |
const e = this.buf[pos] | |
const eLap = e.lap | |
if (lap == eLap) { | |
// The element is ready for writing on this lap. | |
if (pos + 1 < this.cap) { | |
pos += 1; | |
} else { | |
pos = 0; | |
lap += 2; | |
} | |
this.sendx[0] = pos | |
this.sendx[1] = lap | |
e.val = val; | |
// Make the element available for reading. | |
e.lap += 1 | |
return true; | |
} else { | |
// Chan is full | |
return false | |
} | |
} | |
tryRecv(): T | null | (typeof ClosedSymbol) { | |
if (this.closed) { | |
return ClosedSymbol | |
} | |
var [pos, lap] = this.recvx | |
const e = this.buf[pos] | |
const eLap = e.lap | |
if (lap == eLap) { | |
// The element is ready for reading on this lap. | |
if (pos + 1 < this.cap) { | |
pos += 1; | |
} else { | |
pos = 0; | |
lap += 2; | |
} | |
this.recvx[0] = pos | |
this.recvx[1] = lap | |
const val = e.val; | |
// Make the element available for reading. | |
e.lap += 1 | |
return val; | |
} else { | |
// Chan is empty | |
return null | |
} | |
} | |
// Resolves when send completes. If buffer is full will wait. | |
async send(val: T) { | |
while (true) { | |
if (this.trySend(val)) { | |
// Send succeeded, see if we need to unblock a receiver. | |
if (this.recvq.length > 0) { | |
const recvr = this.recvq.pop()! | |
// Resolve the recvr promise | |
recvr() | |
} | |
return; | |
} else { | |
// full channel | |
await (new Promise<void>((resolve, reject) => { | |
this.sendq.push(resolve) | |
})) | |
} | |
} | |
} | |
async recv(): Promise<T | typeof ClosedSymbol> { | |
while (true) { | |
if (this.closed) { | |
return ClosedSymbol | |
} | |
const val = this.tryRecv() | |
if (val !== null) { | |
// recv succeeded, see if we need to unblock a receiver. | |
if (this.sendq.length > 0) { | |
const sendr = this.sendq.pop()! | |
// Resolve the recvr promise | |
sendr() | |
} | |
return val; | |
} else { | |
// empty channel | |
await (new Promise<void>((resolve, reject) => { | |
this.recvq.push(resolve) | |
})) | |
} | |
} | |
} | |
async *[Symbol.asyncIterator]() { | |
while (true) { | |
const val = await this.recv() | |
if (val === ClosedSymbol) { | |
return | |
} | |
yield val | |
} | |
} | |
// No more writes | |
close() { | |
this.closed = true | |
if (this.empty()) { | |
while (this.sendq.length > 0) { | |
this.sendq.pop()!() | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment