Skip to content

Instantly share code, notes, and snippets.

@ignoramous
Last active October 31, 2021 18:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ignoramous/f6ad24214e52c4ba2f28d366810344d1 to your computer and use it in GitHub Desktop.
Save ignoramous/f6ad24214e52c4ba2f28d366810344d1 to your computer and use it in GitHub Desktop.
adaptive lifo ring buffer
// SPDX-License-Identifier: CC0-1.0
// constant-time ring-buffer: https://news.ycombinator.com/item?id=14106577
class ReqQueue {
constructor(cap, fillerFn, serverFn, cleanerFn) {
this.capacity = cap
// a ring buffer
this.rb = new Array(this.capacity)
this.rb.fill(null)
// fillIdx points to the current empty accept slot
this.fillIdx = this.cur(0)
// cleanIdx points to the yet-to-be cleaned slot
// always trails fillIdx
this.cleanIdx = this.cur(0)
// filler creates a req
this.filler = fillerFn
// servers accepts a req
this.server = serverFn
// cleaner cleans-up a req
this.cleaner = cleanerFn
// notes req buffer last empty time
this.emptyTime = Date.now()
}
rewind() {
const p = this.rb
this.rb = new Array(this.capacity)
this.rb.fill(null)
this.fillIdx = this.cur(0)
this.cleanIdx = this.cur(0)
this.emptyTime = Date.now()
return p
}
next(i) {
return (this.capacity + i + 1) % this.capacity
}
cur(i) {
return (this.capacity + i) % this.capacity
}
prev(i) {
return (this.capacity + i - 1) % this.capacity
}
get head() {
return this.cur(this.fillIdx)
}
get tail() {
return this.cur(this.cleanIdx)
}
incrTail() {
this.cleanIdx = this.next(this.cleanIdx)
}
incrHead() {
this.fillIdx = this.next(this.fillIdx)
}
decrHead() {
this.fillIdx = this.prev(this.fillIdx)
}
get full() {
return this.size === this.capacity
}
get empty() {
return this.size === 0
}
get latestEmptyTime() {
if this.empty {
this.emptyTime = Date.now()
}
return this.emptyTime
}
get size() {
const h = this.head // current empty accept slot
const t = this.tail // current cleaned slot
const c = this.capacity // full
const z = 0 // empty
if (h == t) {
// if h and t overlap, and the current slot is
// not empty (null), then rb is full, as h has
// essentially wrapped around and caught up with
// t, which hasn't cleaned up anything yet. but:
// if the current slot is empty (null), then t
// has cleaned up everything and caught up with h
// making rb empty
return (this.rb[h] === null) ? z : c
} else if (h > t) {
return h - t
} else {
return c + h - t
}
}
get pop() {
if (this.empty) {
this.emptyTime = Date.now()
return null
}
const out = this.rb[this.tail]
this.rb[this.tail] = null
this.incrTail()
return out
}
get rpop() {
if (this.empty) {
this.emptyTime = Date.now()
return null
}
this.decrHead()
const out = this.rb[this.head]
this.rb[this.head] = null
return out
}
evict() {
logd("evict, head/tail/size", this.head, this.tail, this.size)
const out = this.pop
if (out === null) return
this.cleaner(out)
}
fifo() {
const first = this.pop
if (first === null) return
logd("fifo, head/tail/size", this.head, this.tail, this.size)
this.server(first)
}
lifo() {
const last = this.rpop
if (last === null) return
logd("lifo, head/tail/size", this.head, this.tail, this.size)
this.server(last)
}
push(d) {
if (this.full) {
this.evict()
}
this.rb[this.head] = this.filler(this, d)
this.incrHead()
}
get highload() {
const cap75 = this.capacity * 0.75
return this.size > cap75
}
poll() {
// adaptive lifo: https://queue.acm.org/detail.cfm?id=2839461
if (this.highload) {
return this.lifo()
} else {
return this.fifo()
}
}
drain() {
for (let r of this.rewind()) if (r) this.cleaner(r)
}
}
function shape(queue, conn) {
// controlled delay: https://queue.acm.org/detail.cfm?id=2839461
const now = Date.now()
let timeout = upperMs
if (queue.latestEmptyTime < now - upperMs) {
timeout = lowerMs
}
const req = {conn: conn, ttl: (now + timeout)}
logd("shaping", req, "timeout", timeout)
return req
}
function accept(conn) {
logd("accepting", conn)
// serve conn
}
function serve(req) {
const now = Date.now()
if (now > req.ttl) return close(req)
return accept(req.conn)
}
function close(req) {
logd("closing", req)
// close req.conn
}
function logd() {
if (debug) console.debug(...arguments)
}
let debug = false
const lowerMs = 500
const upperMs = 5000
function naivetest() {
const rq = new ReqQueue(/*cap*/ 100, shape, serve, close)
for (let i = 1; i < 1000; i++) {
rq.push(i)
// every 5th request is *not* polled
if (i % 5) rq.poll()
}
logd("head/tail/size/load", rq.head, rq.tail, rq.size, rq.highload)
rq.drain()
logd("fin head/tail/size/load", rq.head, rq.tail, rq.size, rq.highload)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment