Skip to content

Instantly share code, notes, and snippets.

@nomisRev
Created July 6, 2020 16:19
Show Gist options
  • Save nomisRev/294e14eb213dfe5aadb54a4a8c0a578a to your computer and use it in GitHub Desktop.
Save nomisRev/294e14eb213dfe5aadb54a4a8c0a578a to your computer and use it in GitHub Desktop.
Suspended Queue
package arrow.fx.coroutines
import arrow.fx.coroutines.Queue.BackpressureStrategy
import arrow.fx.coroutines.Queue.BackpressureStrategy.*
/**
* [Dequeue] allows peeking and taking values from a [Queue], but doesn't allow offering values to the [Queue].
* You can use [Dequeue] to restrict certain functions or layers of your applications to only consume values.
*
* ```kotlin:ank:playground
* import arrow.fx.*
* import arrow.fx.typeclasses.*
* import arrow.fx.extensions.fx
* import kotlin.coroutines.EmptyCoroutineContext
*
* //sampleStart
* suspend fun main(args: Array<String>): Unit = IO.fx {
* fun consumeInts(e: Dequeue<ForIO, Int>, max: Int): IOOf<Unit> =
* (0..max).toList().parTraverse(EmptyCoroutineContext) { i ->
* IO.sleep(i * 10.milliseconds).followedBy(
* e.take().effectMap { println("I took $it") }
* )
* }.void()
*
* val queue = !Queue.unbounded<Int>()
* !consumeInts(queue, 1000).fork()
* !IO.sleep(4.seconds)
* }.suspended()
* //sampleEnd
* ```
*
* @see Queue in the case your functions or layers are allowed to take and offer.
* @see Dequeue in the case your functions or layers are only allowed to peek or take values.
* */
interface Dequeue<A> {
/**
* Takes and removes a value from the [Queue], or semantically blocks until a value becomes available.
*
* ```kotlin:ank
* import arrow.fx.*
* import arrow.fx.extensions.fx
*
* //sampleStart
* suspend fun main(args: Array<String>): Unit = IO.fx {
* val queue = !Queue.unbounded<Int>()
* val (join, _) = !queue.take().fork()
* !queue.offer(1) // Removing this offer makes, !join block forever.
* val res = !join // Join the blocking take, after we offered a value
* !effect { println(res) }
* }.suspended()
* //sampleEnd
* ```
*
* @see [peek] for a function that doesn't remove the value from the [Queue].
* @see [tryTake] for a function that does not semantically block but returns immediately with an [Option].
*/
suspend fun take(): A
/**
* Attempts to take a value from the [Queue] if one is available, this method is guaranteed not to semantically block.
* It returns immediately an [Option] with either [None] or a value wrapped in [Some].
*
* ```kotlin:ank:playground
* import arrow.fx.*
* import arrow.fx.extensions.fx
*
* //sampleStart
* suspend fun main(args: Array<String>): Unit = IO.fx {
* val queue = !Queue.unbounded<Int>()
* val none = !queue.tryTake()
* !queue.offer(1)
* val one = !queue.tryTake()
* val none2 = !queue.tryTake()
* !effect { println("none: $none, one $one, none2: $none2") }
* }.suspended()
* //sampleEnd
* ```
*
* @see [take] for function that semantically blocks until a value becomes available.
* @see [tryPeek] for a function that attempts to peek a value from the [Queue] without removing it.
*/
suspend fun tryTake(): A?
/**
* Peeks a value from the [Queue] or semantically blocks until a value becomes available.
* In contrast to [take], [peek] does not remove the value from the [Queue].
*
* ```kotlin:ank
* import arrow.fx.*
* import arrow.fx.extensions.fx
*
* //sampleStart
* suspend fun main(args: Array<String>): Unit = IO.fx {
* val queue = !Queue.unbounded<Int>()
* val (join, _) = !queue.peek().fork()
* !queue.offer(1) // Removing this offer makes, !join block forever.
* val res = !join // Join the blocking peek, after we offered a value
* val res2 = !queue.peek() // We can peek again since it doesn't remove the value
* !effect { println("res: $res, res2: $res2") }
* }.suspended()
* //sampleEnd
* ```
*
* @see [take] for function that semantically blocks until a value becomes available and removes it from the [Queue].
* @see [tryPeek] for a function that does not semantically blocks but returns immediately with an [Option].
*/
suspend fun peek(): A
/**
* Tries to peek a value from the [Queue]. Returns immediately with either [None] or a value [Some].
* In contrast to [tryTake], [tryPeek] does not remove the value from the [Queue].
*
* import arrow.fx.*
* import arrow.fx.extensions.fx
*
* //sampleStart
* suspend fun main(args: Array<String>): Unit = IO.fx {
* val queue = !Queue.unbounded<Int>()
* val none = !queue.tryPeek()
* !queue.offer(1)
* val one = !queue.tryPeek()
* val one2 = !queue.tryPeek()
* !effect { println("none: $none, one $one, one2: $one2") }
* }.suspended()
* //sampleEnd
*
* @see [peek] for a function that semantically blocks until a value becomes available.
* @see [tryTake] for a function that attempts to take a value from the [Queue] while removing it.
*/
suspend fun tryPeek(): A?
/**
* Immediately returns all available values in the [Queue], and empties the [Queue].
* It returns an [emptyList] when no values are available.
*
* import arrow.fx.*
* import arrow.fx.extensions.fx
*
* //sampleStart
* suspend fun main(args: Array<String>): Unit = IO.fx {
* val queue = !Queue.unbounded<Int>()
* !queue.offerAll(1, 2, 3, 4)
* val values = !queue.takeAll()
* val empty = !queue.takeAll()
* !effect { println("values: $values, empty: $empty") }
* }.suspended()
* //sampleEnd
*
* For a [BackpressureStrategy.Bounded], this also includes all blocking offers that are waiting to be added in the [Queue].
*
* @see [peekAll] for a function that doesn't remove the values from the [Queue].
*/
suspend fun takeAll(): List<A>
/**
* Immediately returns all available values in the [Queue], without empty'ing the [Queue].
* It returns an [emptyList] when no values are available.
*
* import arrow.fx.*
* import arrow.fx.extensions.fx
*
* //sampleStart
* suspend fun main(args: Array<String>): Unit = IO.fx {
* val queue = !Queue.unbounded<Int>()
* !queue.offerAll(1, 2, 3, 4)
* val values = !queue.peekAll()
* val values2 = !queue.peekAll()
* !effect { println("values: $values, values2: values2") }
* }.suspended()
* //sampleEnd
*
* For a [BackpressureStrategy.Bounded], this also includes all blocking offers that are waiting to be added in the [Queue].
*
* @see [takeAll] for a function that also removes all values from the [Queue].
*/
suspend fun peekAll(): List<A>
}
/**
* [Enqueue] allows offering values to a [Queue], but doesn't allow taking values from the [Queue].
* You can use [Enqueue] to restrict certain functions or layers of your applications to only produce values.
*
* ```kotlin:ank:playground
* import arrow.fx.*
* import arrow.fx.extensions.*
* import arrow.fx.typeclasses.*
* import kotlin.coroutines.EmptyCoroutineContext
*
* //sampleStart
* suspend fun main(args: Array<String>): Unit = IO.fx {
* fun produceInts(e: Enqueue<ForIO, Int>, max: Int): IOOf<Unit> =
* (0..max).parTraverse(EmptyCoroutineContext) { i ->
* IO.sleep(i * 10.milliseconds).followedBy(e.offer(i))
* }.void()
*
* val queue = !Queue.unbounded<Int>()
* !produceInts(queue, 1000).fork()
* !IO.sleep(4.seconds)
* val res = !queue.takeAll()
* !effect { println(res) }
* }.suspended()
* //sampleEnd
* ```
*
* @see Queue in the case your functions or layers are allowed to take and offer.
* @see Dequeue in the case your functions or layers are only allowed to peek or take values.
* */
interface Enqueue<A> {
/**
* Offers a value to the [Queue], and behaves differently depending on the [Queue.BackpressureStrategy].
*
* - Semantically blocks until room available in [Queue], check [Queue.bounded] for an example.
*
* - Returns immediately and slides values through the [Queue], check [Queue.sliding] for an example.
*
* - Returns immediately and drops values from the [Queue], check [Queue.dropping] for an example.
*
* - Returns immediately and always offers to the [Queue], check [Queue.unbounded] for an example.
*
* @see [tryOffer] for a [Queue] that always returns immediately, and returns [true] if the value was succesfully put into the [Queue].
*/
suspend fun offer(a: A): Unit
/**
* Tries to offer a value to the [Queue], it ignores the [Queue.BackpressureStrategy]
* and returns false if the [Queue.BackpressureStrategy] does not have room for the value.
*
* Use [tryOffer] if you do not want to block or lose a value and return immediately.
*/
suspend fun tryOffer(a: A): Boolean
suspend fun tryOfferAll(a: Iterable<A>): Boolean
suspend fun tryOfferAll(vararg a: A): Boolean =
tryOfferAll(a.toList())
suspend fun offerAll(a: Iterable<A>): Unit
suspend fun offerAll(vararg a: A): Unit =
offerAll(a.toList())
}
/**
* Lightweight [Concurrent] [F] [Queue] for values of [A].
*
* A [Queue] can be used using 4 different back-pressure strategies:
*
* - [bounded]: Offering to a bounded queue at capacity will cause the fiber making
* the call to be suspended until the queue has space to receive the offer value
*
* - [dropping]: Offering to a dropping queue at capacity will cause the offered
* value to be discarded
*
* - [sliding]: Offering to a sliding queue at capacity will cause the value at the
* front of the queue to be discarded to make room for the offered value
*
* - [unbounded]: An unbounded queue has no notion of capacity and is bound only by
* exhausting the memory limits of the runtime
*/
interface Queue<A> : Dequeue<A>, Enqueue<A> {
/**
* Immediately returns the current size of values in the [Queue].
* Can be a negative number when there are takers but no values available.
*/
suspend fun size(): Int
companion object {
private fun ensureCapacity(capacity: Int): Int =
require(capacity > 0) { "Queue must have a capacity greater than 0" }
.let { capacity }
/**
* Create a [Queue] with [BackpressureStrategy.Bounded].
*
* Offering to a bounded queue at capacity will cause the fiber making
* the call to be suspended until the queue has space to receive the offered value.
*
* ```kotlin:ank:playground
* import arrow.fx.*
* import arrow.fx.extensions.fx
* import arrow.fx.extensions.io.concurrent.concurrent
*
* suspend fun main(args: Array<String>): Unit = IO.fx {
* val capacity = 2
* val q = !Queue.bounded<Int>(capacity)
* !q.offer(42)
* !q.offer(43)
* !q.offer(44).fork() // <-- This `offer` exceeds the capacity and will be suspended
* val fortyTwo = !q.take()
* val fortyThree = !q.take()
* val fortyFour = !q.take()
* !effect { println(listOf(fortyTwo, fortyThree, fortyFour)) }
* }.suspended()
* ```
*/
suspend fun <A> bounded(capacity: Int): Queue<A> =
SuspendingQueue<A>(Bounded(ensureCapacity(capacity)), SuspendingQueue.State.empty())
/**
* Create a [Queue] with [BackpressureStrategy.Sliding].
*
* Offering to a sliding queue at capacity will cause the value at the
* front of the queue to be discarded to make room for the offered value.
*
* ```kotlin:ank:playground
* import arrow.fx.*
* import arrow.fx.extensions.fx
* import arrow.fx.extensions.io.concurrent.concurrent
*
* suspend fun main(args: Array<String>): Unit = IO.fx {
* val capacity = 2
* val q = !Queue.sliding<ForIO, Int>(capacity, IO.concurrent())
* !q.offer(42)
* !q.offer(43)
* !q.offer(44) // <-- This `offer` exceeds the capacity, causing the oldest value to be removed
* val fortyThree = !q.take()
* val fortyFour = !q.take()
* !q.offer(45)
* val fortyFive = !q.take()
* !effect { println(listOf(fortyThree, fortyFour, fortyFive)) }
* }.suspended()
* ```
*/
fun <A> sliding(capacity: Int): Queue<A> =
SuspendingQueue<A>(Sliding(ensureCapacity(capacity)), SuspendingQueue.State.empty())
/**
* Create a [Queue] with [BackpressureStrategy.Dropping].
*
* Offering to a dropping queue at capacity will cause the offered value to be discarded.
*
* ```kotlin:ank:playground
* import arrow.fx.*
* import arrow.fx.extensions.fx
* import arrow.fx.extensions.io.concurrent.concurrent
*
* suspend fun main(args: Array<String>): Unit = IO.fx {
* val capacity = 2
* val q = !Queue.dropping<ForIO, Int>(capacity, IO.concurrent())
* !q.offer(42)
* !q.offer(43)
* !q.offer(44) // <-- This `offer` exceeds the capacity and will be dropped immediately
* val fortyTwo = !q.take()
* val fortyThree = !q.take()
* !q.offer(45)
* val fortyFive = !q.take()
* !effect { println(listOf(fortyTwo, fortyThree, fortyFive)) }
* }.suspended()
* ```
*/
suspend fun <A> dropping(capacity: Int): Queue<A> =
SuspendingQueue<A>(Dropping(ensureCapacity(capacity)), SuspendingQueue.State.empty())
/**
* Create a [Queue] with [BackpressureStrategy.Unbounded].
*
* An unbounded queue has no notion of capacity and is bound only by exhausting the memory limits of the runtime.
*
* ```kotlin:ank:playground
* import arrow.fx.*
* import arrow.fx.extensions.fx
* import arrow.fx.extensions.io.concurrent.concurrent
*
* suspend fun main(args: Array<String>): Unit = IO.fx {
* val q = !Queue.unbounded<ForIO, Int>(IO.concurrent())
* !q.offer(42)
* // ...
* !q.offer(42000000)
* val res = !q.take()
* !effect { println(res) }
* }.suspended()
* ```
*/
suspend fun <A> unbounded(): Queue<A> =
SuspendingQueue<A>(Unbounded, SuspendingQueue.State.empty())
}
/** Internal model that represent the Queue strategies **/
sealed class BackpressureStrategy {
data class Bounded(val capacity: Int) : BackpressureStrategy()
data class Sliding(val capacity: Int) : BackpressureStrategy()
data class Dropping(val capacity: Int) : BackpressureStrategy()
object Unbounded : BackpressureStrategy()
}
}
private typealias Callback<A> = (Result<A>) -> Unit
internal class SuspendingQueue<A> internal constructor(private val strategy: BackpressureStrategy, initial: State<A>?) :
Queue<A> {
private val state: AtomicRefW<State<A>> = AtomicRefW(initial ?: State.empty())
override suspend fun size(): Int =
when (val curr = state.value) {
is State.Deficit -> -curr.takes.size
is State.Surplus -> 1 + curr.offers.size
}
override suspend fun tryOffer(a: A): Boolean =
unsafeTryOffer(a)
override suspend fun offer(a: A): Unit {
val didPut = unsafeTryOffer(a, false)
return if (didPut) Unit else cancellableF { cb -> unsafeOffer(a, cb) }
}
override suspend fun offerAll(a: Iterable<A>): Unit {
val didPut = unsafeTryOfferAll(a, false)
return if (didPut) Unit else cancellableF { cb -> unsafeOfferAll(a, cb) }
}
override suspend fun tryOfferAll(a: Iterable<A>): Boolean =
unsafeTryOfferAll(a)
override suspend fun tryTake(): A? =
when (val a = unsafeTryTake()) {
Maybe.None -> null
is Maybe.Just -> a.value
}
override suspend fun take(): A =
when (val a = unsafeTryTake()) {
Maybe.None -> cancellableF(::unsafeTake)
is Maybe.Just -> a.value
}
override suspend fun takeAll(): List<A> =
unsafeTakeAll()
override suspend fun tryPeek(): A? =
when (val s = unsafeTryPeek()) {
Maybe.None -> null
is Maybe.Just -> s.value
}
override suspend fun peek(): A =
when (val s = unsafeTryPeek()) {
Maybe.None -> cancellableF(::unsafePeek)
is Maybe.Just -> s.value
}
override suspend fun peekAll(): List<A> =
unsafePeekAll()
private tailrec suspend fun unsafeTryOffer(a: A, tryStrategy: Boolean = true): Boolean =
when (val current = state.value) {
is State.Surplus ->
unsafeOfferAllSurplusForStrategy(listOf(a), tryStrategy, current) ?: unsafeTryOffer(a, tryStrategy)
is State.Deficit -> {
val taker: Callback<A>? = current.takes.values.firstOrNull()
val update: State<A> =
if (taker == null) { // If no takers or values, we can safely store a single value in Surplus.
State.Surplus(IQueue(a), emptyMap())
} else { // Else we need to remove the first taker from the current state
val rest = current.takes.entries.drop(1)
if (rest.isEmpty()) current.copy(emptyMap(), emptyMap())
else State.Deficit(emptyMap(), rest.toMap())
}
if (!state.compareAndSet(current, update)) {
unsafeTryOffer(a, tryStrategy) // Update failed, recurse
} else if (taker != null || current.peeks.isNotEmpty()) {
callTakeAndPeeks(a, taker, current.peeks.values) // Update succeeded, call callbacks with value
} else true // Update succeeded, no callbacks need to be called.
}
}
private tailrec suspend fun unsafeTryOfferAll(aas: Iterable<A>, tryStrategy: Boolean = true): Boolean =
when (val current = state.value) {
is State.Surplus ->
unsafeOfferAllSurplusForStrategy(aas, tryStrategy, current) ?: unsafeTryOfferAll(aas, tryStrategy)
is State.Deficit -> {
if (aas.count() > current.takes.values.size) {
unsafeOfferAllDecifitForStrategy(aas, tryStrategy, current) ?: unsafeTryOfferAll(aas, tryStrategy)
} else {
val update =
if (aas.count() == current.takes.values.size) State.empty<A>()
else current.copy(peeks = emptyMap(), takes = current.takes.entries.drop(aas.count()).toMap())
if (!state.compareAndSet(current, update)) {
unsafeTryOfferAll(aas, tryStrategy)
} else if (current.takes.isNotEmpty() || current.peeks.isNotEmpty()) {
streamAllPeeksAndTakers(aas, current.peeks.values, current.takes.values)
} else true
}
}
}
private tailrec suspend fun unsafeOffer(a: A, onPut: Callback<Unit>): CancelToken =
when (val current = state.value) {
is State.Surplus -> {
val id = Token()
val newMap = current.offers + Pair(id, Pair(a, onPut))
if (state.compareAndSet(current, State.Surplus(current.values, newMap))) CancelToken {
unsafeCancelOffer(
listOf(
id
)
)
}
else unsafeOffer(a, onPut)
}
is State.Deficit -> {
val first = current.takes.values.firstOrNull()
val update: State<A> =
if (current.takes.isEmpty()) {
State.Surplus(IQueue(a), emptyMap())
} else {
val rest = current.takes.entries.drop(1)
if (rest.isEmpty()) current.copy(emptyMap(), emptyMap())
else State.Deficit(emptyMap(), rest.toMap())
}
if (state.compareAndSet(current, update)) {
if (first != null || current.peeks.isNotEmpty()) {
callTakeAndPeeks(a, first, current.peeks.values)
onPut(Result.success(Unit))
CancelToken.unit
} else {
onPut(Result.success(Unit))
CancelToken.unit
}
} else unsafeOffer(a, onPut)
}
}
private tailrec suspend fun unsafeOfferAll(a: Iterable<A>, onPut: Callback<Unit>): CancelToken =
when (val current = state.value) {
is State.Surplus -> {
val tokens = (1..a.count()).map { Token() }
val callbacks = (1..a.count()).map { noOpCallback }
val tupled: List<Pair<A, Callback<Unit>>> = a.zip(callbacks)
val id = Token()
val lastOffer = Pair(id, Pair(a.last(), onPut)) // When last value is offered, than offerAll completes.
val newOffers = tokens.zip(tupled, ::Pair) + lastOffer
val newMap = current.offers + newOffers
val update: State<A> = State.Surplus(current.values, newMap)
if (state.compareAndSet(current, update)) CancelToken { unsafeCancelOffer(tokens + id) }
else unsafeOfferAll(a, onPut)
}
is State.Deficit -> {
val capacity = (strategy as? Bounded)?.capacity ?: 0
if (a.count() > current.takes.size && (a.count() - current.takes.size) > capacity) {
val takerValues = a.take(current.takes.size)
val leftOver = a.drop(current.takes.size)
val values = leftOver.take(capacity)
val needOffered = leftOver.drop(capacity)
val tokens = (1..needOffered.size).map { Token() }
val callbacks = (1..needOffered.size).map { noOpCallback }
val tupled: List<Pair<A, Callback<Unit>>> = needOffered.zip(callbacks)
val id = Token()
val lastOffer =
Pair(id, Pair(needOffered.last(), onPut)) // When last value is offered, than offerAll completes.
val newOffers = tokens.zip(tupled, ::Pair) + lastOffer
val update = State.Surplus<A>(IQueue(values), newOffers.toMap())
if (state.compareAndSet(current, update)) {
streamAllPeeksAndTakers(takerValues, current.peeks.values, current.takes.values)
CancelToken.unit
} else unsafeOfferAll(a, onPut) // recurse
} else {
val update = State.Surplus<A>(IQueue(a.drop(current.takes.size)), emptyMap())
if (state.compareAndSet(current, update)) {
streamAllPeeksAndTakers(a, current.peeks.values, current.takes.values)
onPut(Result.success(Unit))
CancelToken.unit
} else unsafeOfferAll(a, onPut) // recurse
}
}
}
private tailrec fun unsafeCancelOffer(ids: List<Token>): Unit =
when (val current = state.value) {
is State.Surplus -> {
val update = current.copy(offers = current.offers - ids)
if (state.compareAndSet(current, update)) Unit
else unsafeCancelOffer(ids)
}
is State.Deficit -> Unit
}
private tailrec suspend fun unsafeTryTake(): Maybe<A> =
when (val current = state.value) {
is State.Surplus -> {
val (head, tail) = current.values.dequeue()
if (current.offers.isEmpty()) {
val update: State<A> =
if (tail.isEmpty()) State.empty<A>()
else current.copy(values = tail)
if (state.compareAndSet(current, update)) Maybe.Just(head)
else unsafeTryTake()
} else {
val (ax, notify) = current.offers.values.first()
val xs = current.offers.entries.drop(1)
val update: State<A> = State.Surplus(tail.enqueue(ax), xs.toMap())
if (state.compareAndSet(current, update)) {
notify(Result.success(Unit)) // Notify offer that it finished
Maybe.Just(head)
} else unsafeTryTake() // compareAndSet failed try again
}
}
is State.Deficit -> Maybe.None
}
private tailrec suspend fun unsafeTake(onTake: Callback<A>): CancelToken =
when (val current = state.value) {
is State.Surplus -> {
val (head, tail) = current.values.dequeue()
if (current.offers.isEmpty()) {
val update: State<A> =
if (tail.isEmpty()) State.empty<A>()
else current.copy(values = tail)
if (state.compareAndSet(current, update)) {
onTake(Result.success(head)) // Call takers callback with value
CancelToken.unit
} else {
unsafeTake(onTake) // Update failed, recurse
}
} else {
val (ax, notify) = current.offers.values.first()
val xs = current.offers.entries.drop(0)
val update: State<A> = State.Surplus(tail.enqueue(ax), xs.toMap())
if (state.compareAndSet(current, update)) {
notify(Result.success(Unit)) // Notify offer that it finished,
onTake(Result.success(head)) // Call takers callback with value
CancelToken.unit // Nothing to cancel
} else unsafeTake(onTake) // Update failed, recurse
}
}
is State.Deficit -> {
val id = Token()
val newQueue = current.takes + Pair(id, onTake)
val update: State<A> = State.Deficit(current.peeks, newQueue)
if (state.compareAndSet(current, update)) CancelToken { unsafeCancelTake(id) }
else unsafeTake(onTake)
}
}
private tailrec fun unsafeCancelTake(id: Token): Unit =
when (val current = state.value) {
is State.Deficit -> {
val newMap = current.takes - id
val update = State.Deficit<A>(current.peeks, newMap)
if (state.compareAndSet(current, update)) Unit
else unsafeCancelTake(id)
}
is State.Surplus -> Unit
}
private tailrec suspend fun unsafeTakeAll(): List<A> =
when (val current = state.value) {
is State.Surplus -> {
val all = current.values.toList()
if (current.offers.isEmpty()) {
val update: State<A> = State.empty<A>()
if (state.compareAndSet(current, update)) all
else unsafeTakeAll()
} else {
val allValues = current.offers.values.map { it.first }
val allOffers = current.offers.values.map { it.second }
val update = State.empty<A>()
// Call all outstanding offer calls that they're finished, and return all available + offered values.
if (state.compareAndSet(current, update)) {
allOffers.forEach { cb -> cb(Result.success(Unit)) }
all + allValues
} else unsafeTakeAll()
}
}
is State.Deficit -> emptyList()
}
private suspend fun unsafePeekAll(): List<A> =
when (val current = state.value) {
is State.Deficit -> emptyList()
is State.Surplus -> {
val all = current.values.toList()
val allOffered = current.offers.values.map { it.first }
all + allOffered
}
}
private suspend fun unsafeTryPeek(): Maybe<A> =
when (val current = state.value) {
is State.Surplus -> Maybe.Just(current.values.first())
is State.Deficit -> Maybe.None
}
private suspend tailrec fun unsafePeek(onPeek: Callback<A>): CancelToken =
when (val current = state.value) {
is State.Surplus -> {
onPeek(Result.success(current.values.first()))
CancelToken.unit
}
is State.Deficit -> {
val id = Token()
val newReads = current.peeks + Pair(id, onPeek)
val update: State<A> = State.Deficit(newReads, current.takes)
if (state.compareAndSet(current, update)) CancelToken { unsafeCancelRead(id) }
else unsafePeek(onPeek)
}
}
private tailrec fun unsafeCancelRead(id: Token): Unit =
when (val current = state.value) {
is State.Deficit -> {
val newMap = current.peeks - id
val update: State<A> = State.Deficit(newMap, current.takes)
if (state.compareAndSet(current, update)) Unit
else unsafeCancelRead(id)
}
is State.Surplus -> Unit
}
/**
* A Queue can be in three states
* [Deficit]: Contains three maps of registered id & take/reads/shutdown callbacks waiting
* for a value to become available.
*
* [Surplus]: Contains a queue of values and two maps of registered id & offer/shutdown callbacks waiting to
* offer once there is room (if the queue is bounded, dropping or sliding).
*/
sealed class State<out A> {
data class Deficit<A>(
val peeks: Map<Token, Callback<A>>,
val takes: Map<Token, Callback<A>>
) : State<A>()
data class Surplus<A>(
val values: IQueue<A>,
val offers: Map<Token, Pair<A, Callback<Unit>>>
) : State<A>()
companion object {
private val empty: Deficit<Any?> = Deficit(emptyMap(), emptyMap())
fun <A> empty(): Deficit<A> = empty as Deficit<A>
}
}
/**
* Unsafely handle offer at State.Surplus.
*
* Return:
* - `true` when handled
* - `false` when offer should be scheduled (only used for bounded).
* or when [tryStrategy] is true to signal no room in the [Queue].
* - null when needs to recurse and try again
*/
private suspend fun unsafeOfferAllSurplusForStrategy(
a: Iterable<A>,
tryStrategy: Boolean,
surplus: State.Surplus<A>
): Boolean? =
when (strategy) {
is Bounded ->
when {
surplus.values.length() + a.count() > strategy.capacity -> false
state.compareAndSet(surplus, surplus.copy(surplus.values.enqueue(a))) -> true
else -> null
}
is Sliding -> {
val nextQueue = if (surplus.values.length() + (a.count() - 1) < strategy.capacity) surplus.values.enqueue(a)
else surplus.values.drop(a.count()).enqueue(a)
when {
surplus.values.length() >= strategy.capacity && tryStrategy -> false
state.compareAndSet(surplus, surplus.copy(values = nextQueue)) -> true
else -> null
}
}
is Dropping -> {
val nextQueue = if (surplus.values.length() + (a.count() - 1) < strategy.capacity) surplus.values.enqueue(a)
else surplus.values
when {
surplus.values.length() >= strategy.capacity && tryStrategy -> false
state.compareAndSet(surplus, surplus.copy(values = nextQueue)) -> true
else -> null
}
}
is Unbounded ->
if (!state.compareAndSet(surplus, surplus.copy(values = surplus.values.enqueue(a)))) null
else true
}
private suspend fun unsafeOfferAllDecifitForStrategy(
a: Iterable<A>,
tryStrategy: Boolean,
deficit: State.Deficit<A>
): Boolean? =
when (strategy) {
is Bounded ->
when {
// We need to atomically offer to deficit AND schedule offers according to capacity.
a.count() > deficit.takes.size && (a.count() - deficit.takes.size) > strategy.capacity -> false
a.count() > deficit.takes.size -> { // Offer to deficit and update to Surplus, capacity check above
val update = State.Surplus<A>(IQueue(a.drop(deficit.takes.size)), emptyMap())
if (state.compareAndSet(deficit, update)) streamAllPeeksAndTakers(
a,
deficit.peeks.values,
deficit.takes.values
)
else null // recurse
}
else -> null // updates failed, recurse
}
is Sliding -> {
val afterTakers = a.count() - deficit.takes.size
if (afterTakers > strategy.capacity && tryStrategy) false
else {
val newQueue = if (afterTakers > strategy.capacity) IQueue(
a.drop(deficit.takes.size).drop(afterTakers - strategy.capacity)
)
else IQueue(a.drop(deficit.takes.size))
val update = State.Surplus<A>(newQueue, emptyMap())
if (state.compareAndSet(deficit, update)) streamAllPeeksAndTakers(
a,
deficit.peeks.values,
deficit.takes.values
)
else null // recurse
}
}
is Dropping -> {
val afterTakers = a.count() - deficit.takes.size
if (afterTakers > strategy.capacity && tryStrategy) false
else {
val newQueue = if (afterTakers > strategy.capacity) IQueue(a.drop(deficit.takes.size).take(strategy.capacity))
else IQueue(a.drop(deficit.takes.size))
val update = State.Surplus<A>(newQueue, emptyMap())
if (state.compareAndSet(deficit, update)) streamAllPeeksAndTakers(
a,
deficit.peeks.values,
deficit.takes.values
)
else null // recurse
}
}
is Unbounded -> {
val update = State.Surplus<A>(IQueue(a.drop(deficit.takes.size)), emptyMap())
if (state.compareAndSet(deficit, update)) streamAllPeeksAndTakers(a, deficit.peeks.values, deficit.takes.values)
else null
}
}
private suspend fun streamAllPeeksAndTakers(
a: Iterable<A>,
peeks: Iterable<Callback<A>>,
takes: Iterable<Callback<A>>
): Boolean {
a.firstOrNull()?.let { aa ->
val res = Result.success(aa)
peeks.forEach { cb -> cb(res) }
}
a.zip(takes.take(a.count())) { aa, cb ->
cb(Result.success(aa))
}
return true
}
private suspend fun callTakeAndPeeks(a: A, take: Callback<A>?, peeks: Iterable<Callback<A>>): Boolean {
val value = Result.success(a)
peeks.callAll(value)
if (take != null) take(value)
return true
}
// For streaming a value to a whole `reads` collection
private suspend fun Iterable<Callback<A>>.callAll(value: Result<A>): Unit =
forEach { cb -> cb(value) }
private val noOpCallback: Callback<Unit> =
{ Unit }
companion object {
suspend fun <F, A> empty(): SuspendingQueue<A> =
SuspendingQueue(Unbounded, null)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment