Skip to content

Instantly share code, notes, and snippets.

@djspiewak
Created February 13, 2022 01:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save djspiewak/0623e6a11b0e22acc0eb4e7496e5283e to your computer and use it in GitHub Desktop.
Save djspiewak/0623e6a11b0e22acc0eb4e7496e5283e to your computer and use it in GitHub Desktop.
// ported with love from http://psy-lob-saw.blogspot.com/2014/04/notes-on-concurrent-ring-buffer-queue.html
private[effect] final class UnsafeBounded[A](bound: Int) {
private[this] val buffer = new Array[AnyRef](bound)
private[this] val sequenceBuffer = new AtomicLongArray(bound)
private[this] val head = new AtomicLong(0)
private[this] val tail = new AtomicLong(0)
0.until(bound).foreach(i => sequenceBuffer.set(i, i.toLong))
private[this] val length = new AtomicInteger(0)
def debug(): String = buffer.mkString("[", ", ", "]")
def size(): Int = length.get()
def put(data: A): Unit = {
@tailrec
def loop(): Long = {
val currentTail = tail.get()
val seq = sequenceBuffer.get(project(currentTail))
val delta = seq - currentTail
if (delta == 0) {
if (tail.compareAndSet(currentTail, currentTail + 1))
currentTail
else
loop()
} else if (delta < 0) {
throw FailureSignal
} else {
loop()
}
}
val currentTail = loop()
buffer(project(currentTail)) = data.asInstanceOf[AnyRef]
sequenceBuffer.incrementAndGet(project(currentTail))
length.incrementAndGet()
()
}
def take(): A = {
@tailrec
def loop(): Long = {
val currentHead = head.get()
val seq = sequenceBuffer.get(project(currentHead))
val delta = seq - (currentHead + 1)
if (delta == 0) {
if (head.compareAndSet(currentHead, currentHead + 1))
currentHead
else
loop()
} else if (delta < 0) {
throw FailureSignal
} else {
loop()
}
}
val currentHead = loop()
val back = buffer(project(currentHead)).asInstanceOf[A]
buffer(project(currentHead)) = null
sequenceBuffer.set(project(currentHead), currentHead + bound)
length.decrementAndGet()
back
}
// TODO handle wraparound negative
private[this] def project(idx: Long): Int =
(idx % bound).toInt
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment