Skip to content

Instantly share code, notes, and snippets.

@quelgar
Created May 23, 2022 05:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save quelgar/d6b6014eba48727062ba7d2b7ad2b328 to your computer and use it in GitHub Desktop.
Save quelgar/d6b6014eba48727062ba7d2b7ad2b328 to your computer and use it in GitHub Desktop.
A ring buffer implementation for ZIO STM
import zio.stm.*
import zio.prelude.*
final class TRingBuffer[A] private (array: TArray[A], stateRef: TRef[TRingBuffer.State]) {
import TRingBuffer.*
private def uncheckedRead(state: State): USTM[A] = array(state.start) <*
stateRef.set(state.copy(start = (state.start + 1) % capacity, size = state.size - 1))
def put(a: A): USTM[Unit] = for {
current <- stateRef.get
_ <- array.update(current.end, _ => a)
newState =
if (current.size == capacity)
current.copy(
end = (current.end + 1) % capacity,
start = (current.start + 1) % capacity
)
else
current.copy(end = (current.end + 1) % capacity, size = current.size + 1)
_ <- stateRef.set(newState)
} yield ()
def poll: USTM[Option[A]] = {
val changes = for {
current <- stateRef.get
_ <- ZSTM.when(current.size == 0)(ZSTM.fail(None))
value <- uncheckedRead(current)
} yield value
changes.optional
}
def take: USTM[A] = stateRef.get.flatMap { current =>
if (current.size == 0) STM.retry
else uncheckedRead(current)
}
def peekOption: USTM[Option[A]] = stateRef.get.flatMap { current =>
if (current.size == 0) STM.none else array(current.start).asSome
}
def peek: USTM[A] = stateRef.get.flatMap { current =>
if (current.size == 0) STM.retry else array(current.start)
}
def drop: USTM[Boolean] = stateRef.get.flatMap { current =>
if (current.size == 0) ZSTM.succeed(false)
else stateRef.set(
current.copy(start = (current.start + 1) % capacity, size = current.size - 1)
).as(true)
}
def capacity: Int = array.size
def size: USTM[Int] = stateRef.get.map(_.size)
def clear: USTM[Unit] = stateRef.set(State(0, 0, 0))
def isFull: USTM[Boolean] = size.map(_ == capacity)
def isEmpty: USTM[Boolean] = size.map(_ == 0)
def apply(index: Int): STM[Option[Nothing], A] =
size.flatMap { currentSize =>
if (index >= currentSize) STM.fail(None)
else stateRef.get.flatMap(current => array((current.start + index) % capacity))
}
def fold[B](initial: B)(f: (B, A) => B): USTM[B] = stateRef.get.flatMap { currentState =>
def help(index: Int, b: B): USTM[B] =
if (index >= currentState.size) STM.succeed(b)
else array((currentState.start + index) % capacity).flatMap(a => help(index + 1, f(b, a)))
help(0, initial)
}
def sum(implicit ev: Numeric[A]): USTM[A] = fold(ev.zero)(ev.plus)
}
object TRingBuffer {
final private case class State(start: Int, end: Int, size: Int)
def make[A](initial: A*): USTM[TRingBuffer[A]] = TArray.make(initial: _*).flatMap(array =>
TRef.make(State(0, 0, initial.size)).map(new TRingBuffer[A](array, _))
)
def fromIterable[A](
initial: Iterable[A]
): USTM[TRingBuffer[A]] = TArray.fromIterable(initial).flatMap(array =>
TRef.make(State(0, 0, initial.size)).map(new TRingBuffer[A](array, _))
)
def empty[A](capacity: Int, initial: A): USTM[TRingBuffer[A]] =
TArray.fromIterable(Iterable.fill(capacity)(initial)).flatMap(array =>
TRef.make(State(0, 0, 0)).map(new TRingBuffer[A](array, _))
)
def empty[A: Identity](capacity: Int): USTM[TRingBuffer[A]] =
empty(capacity, Identity[A].identity)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment