Skip to content

Instantly share code, notes, and snippets.

@MarcoPolo
Created November 11, 2022 15:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MarcoPolo/27195ca0d69700e01131f01124ff49fb to your computer and use it in GitHub Desktop.
Save MarcoPolo/27195ca0d69700e01131f01124ff49fb to your computer and use it in GitHub Desktop.
A Go-like buffered channel for JS. Has backpressure
class BufferedChan<T> {
cap: number
sendx = new Uint32Array(2)
recvx = new Uint32Array(2)
buf: { lap: number, val: T | null }[]
closed = false
// Pending senders/receivers
sendq: Array<() => void> = []
recvq: Array<() => void> = []
constructor(cap: number, buffer?: Array<{ lap: number, val: T | null }>) {
this.cap = cap
if (buffer) {
this.cap = buffer.length
this.buf = buffer
} else {
this.buf = new Array(cap).fill(0).map(() => ({ val: null, lap: 0 }));
}
// Recvr starts on lap 1
this.recvx[1] = 1;
}
full() {
const pos = this.sendx[0]
const lap = this.sendx[1]
const eLap = this.buf[pos].lap
return (lap - eLap) > 0
}
empty() {
const pos = this.recvx[0]
const lap = this.recvx[1]
const eLap = this.buf[pos].lap
return (lap - eLap) > 0
}
trySend(val: T): boolean {
var [pos, lap] = this.sendx
const e = this.buf[pos]
const eLap = e.lap
if (lap == eLap) {
// The element is ready for writing on this lap.
if (pos + 1 < this.cap) {
pos += 1;
} else {
pos = 0;
lap += 2;
}
this.sendx[0] = pos
this.sendx[1] = lap
e.val = val;
// Make the element available for reading.
e.lap += 1
return true;
} else {
// Chan is full
return false
}
}
tryRecv(): T | null | (typeof ClosedSymbol) {
if (this.closed) {
return ClosedSymbol
}
var [pos, lap] = this.recvx
const e = this.buf[pos]
const eLap = e.lap
if (lap == eLap) {
// The element is ready for reading on this lap.
if (pos + 1 < this.cap) {
pos += 1;
} else {
pos = 0;
lap += 2;
}
this.recvx[0] = pos
this.recvx[1] = lap
const val = e.val;
// Make the element available for reading.
e.lap += 1
return val;
} else {
// Chan is empty
return null
}
}
// Resolves when send completes. If buffer is full will wait.
async send(val: T) {
while (true) {
if (this.trySend(val)) {
// Send succeeded, see if we need to unblock a receiver.
if (this.recvq.length > 0) {
const recvr = this.recvq.pop()!
// Resolve the recvr promise
recvr()
}
return;
} else {
// full channel
await (new Promise<void>((resolve, reject) => {
this.sendq.push(resolve)
}))
}
}
}
async recv(): Promise<T | typeof ClosedSymbol> {
while (true) {
if (this.closed) {
return ClosedSymbol
}
const val = this.tryRecv()
if (val !== null) {
// recv succeeded, see if we need to unblock a receiver.
if (this.sendq.length > 0) {
const sendr = this.sendq.pop()!
// Resolve the recvr promise
sendr()
}
return val;
} else {
// empty channel
await (new Promise<void>((resolve, reject) => {
this.recvq.push(resolve)
}))
}
}
}
async *[Symbol.asyncIterator]() {
while (true) {
const val = await this.recv()
if (val === ClosedSymbol) {
return
}
yield val
}
}
// No more writes
close() {
this.closed = true
if (this.empty()) {
while (this.sendq.length > 0) {
this.sendq.pop()!()
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment