Created
February 13, 2022 01:10
-
-
Save djspiewak/0623e6a11b0e22acc0eb4e7496e5283e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// ported with love from http://psy-lob-saw.blogspot.com/2014/04/notes-on-concurrent-ring-buffer-queue.html | |
private[effect] final class UnsafeBounded[A](bound: Int) { | |
private[this] val buffer = new Array[AnyRef](bound) | |
private[this] val sequenceBuffer = new AtomicLongArray(bound) | |
private[this] val head = new AtomicLong(0) | |
private[this] val tail = new AtomicLong(0) | |
0.until(bound).foreach(i => sequenceBuffer.set(i, i.toLong)) | |
private[this] val length = new AtomicInteger(0) | |
def debug(): String = buffer.mkString("[", ", ", "]") | |
def size(): Int = length.get() | |
def put(data: A): Unit = { | |
@tailrec | |
def loop(): Long = { | |
val currentTail = tail.get() | |
val seq = sequenceBuffer.get(project(currentTail)) | |
val delta = seq - currentTail | |
if (delta == 0) { | |
if (tail.compareAndSet(currentTail, currentTail + 1)) | |
currentTail | |
else | |
loop() | |
} else if (delta < 0) { | |
throw FailureSignal | |
} else { | |
loop() | |
} | |
} | |
val currentTail = loop() | |
buffer(project(currentTail)) = data.asInstanceOf[AnyRef] | |
sequenceBuffer.incrementAndGet(project(currentTail)) | |
length.incrementAndGet() | |
() | |
} | |
def take(): A = { | |
@tailrec | |
def loop(): Long = { | |
val currentHead = head.get() | |
val seq = sequenceBuffer.get(project(currentHead)) | |
val delta = seq - (currentHead + 1) | |
if (delta == 0) { | |
if (head.compareAndSet(currentHead, currentHead + 1)) | |
currentHead | |
else | |
loop() | |
} else if (delta < 0) { | |
throw FailureSignal | |
} else { | |
loop() | |
} | |
} | |
val currentHead = loop() | |
val back = buffer(project(currentHead)).asInstanceOf[A] | |
buffer(project(currentHead)) = null | |
sequenceBuffer.set(project(currentHead), currentHead + bound) | |
length.decrementAndGet() | |
back | |
} | |
// TODO handle wraparound negative | |
private[this] def project(idx: Long): Int = | |
(idx % bound).toInt | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment