Created
July 6, 2020 16:19
-
-
Save nomisRev/294e14eb213dfe5aadb54a4a8c0a578a to your computer and use it in GitHub Desktop.
Suspended Queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package arrow.fx.coroutines | |
import arrow.fx.coroutines.Queue.BackpressureStrategy | |
import arrow.fx.coroutines.Queue.BackpressureStrategy.* | |
/** | |
* [Dequeue] allows peeking and taking values from a [Queue], but doesn't allow offering values to the [Queue]. | |
* You can use [Dequeue] to restrict certain functions or layers of your applications to only consume values. | |
* | |
* ```kotlin:ank:playground | |
* import arrow.fx.* | |
* import arrow.fx.typeclasses.* | |
* import arrow.fx.extensions.fx | |
* import kotlin.coroutines.EmptyCoroutineContext | |
* | |
* //sampleStart | |
* suspend fun main(args: Array<String>): Unit = IO.fx { | |
* fun consumeInts(e: Dequeue<ForIO, Int>, max: Int): IOOf<Unit> = | |
* (0..max).toList().parTraverse(EmptyCoroutineContext) { i -> | |
* IO.sleep(i * 10.milliseconds).followedBy( | |
* e.take().effectMap { println("I took $it") } | |
* ) | |
* }.void() | |
* | |
* val queue = !Queue.unbounded<Int>() | |
* !consumeInts(queue, 1000).fork() | |
* !IO.sleep(4.seconds) | |
* }.suspended() | |
* //sampleEnd | |
* ``` | |
* | |
* @see Queue in the case your functions or layers are allowed to take and offer. | |
* @see Dequeue in the case your functions or layers are only allowed to peek or take values. | |
* */ | |
interface Dequeue<A> { | |
/** | |
* Takes and removes a value from the [Queue], or semantically blocks until a value becomes available. | |
* | |
* ```kotlin:ank | |
* import arrow.fx.* | |
* import arrow.fx.extensions.fx | |
* | |
* //sampleStart | |
* suspend fun main(args: Array<String>): Unit = IO.fx { | |
* val queue = !Queue.unbounded<Int>() | |
* val (join, _) = !queue.take().fork() | |
* !queue.offer(1) // Removing this offer makes, !join block forever. | |
* val res = !join // Join the blocking take, after we offered a value | |
* !effect { println(res) } | |
* }.suspended() | |
* //sampleEnd | |
* ``` | |
* | |
* @see [peek] for a function that doesn't remove the value from the [Queue]. | |
* @see [tryTake] for a function that does not semantically block but returns immediately with an [Option]. | |
*/ | |
suspend fun take(): A | |
/** | |
* Attempts to take a value from the [Queue] if one is available, this method is guaranteed not to semantically block. | |
* It returns immediately an [Option] with either [None] or a value wrapped in [Some]. | |
* | |
* ```kotlin:ank:playground | |
* import arrow.fx.* | |
* import arrow.fx.extensions.fx | |
* | |
* //sampleStart | |
* suspend fun main(args: Array<String>): Unit = IO.fx { | |
* val queue = !Queue.unbounded<Int>() | |
* val none = !queue.tryTake() | |
* !queue.offer(1) | |
* val one = !queue.tryTake() | |
* val none2 = !queue.tryTake() | |
* !effect { println("none: $none, one $one, none2: $none2") } | |
* }.suspended() | |
* //sampleEnd | |
* ``` | |
* | |
* @see [take] for function that semantically blocks until a value becomes available. | |
* @see [tryPeek] for a function that attempts to peek a value from the [Queue] without removing it. | |
*/ | |
suspend fun tryTake(): A? | |
/** | |
* Peeks a value from the [Queue] or semantically blocks until a value becomes available. | |
* In contrast to [take], [peek] does not remove the value from the [Queue]. | |
* | |
* ```kotlin:ank | |
* import arrow.fx.* | |
* import arrow.fx.extensions.fx | |
* | |
* //sampleStart | |
* suspend fun main(args: Array<String>): Unit = IO.fx { | |
* val queue = !Queue.unbounded<Int>() | |
* val (join, _) = !queue.peek().fork() | |
* !queue.offer(1) // Removing this offer makes, !join block forever. | |
* val res = !join // Join the blocking peek, after we offered a value | |
* val res2 = !queue.peek() // We can peek again since it doesn't remove the value | |
* !effect { println("res: $res, res2: $res2") } | |
* }.suspended() | |
* //sampleEnd | |
* ``` | |
* | |
* @see [take] for function that semantically blocks until a value becomes available and removes it from the [Queue]. | |
* @see [tryPeek] for a function that does not semantically blocks but returns immediately with an [Option]. | |
*/ | |
suspend fun peek(): A | |
/** | |
* Tries to peek a value from the [Queue]. Returns immediately with either [None] or a value [Some]. | |
* In contrast to [tryTake], [tryPeek] does not remove the value from the [Queue]. | |
* | |
* import arrow.fx.* | |
* import arrow.fx.extensions.fx | |
* | |
* //sampleStart | |
* suspend fun main(args: Array<String>): Unit = IO.fx { | |
* val queue = !Queue.unbounded<Int>() | |
* val none = !queue.tryPeek() | |
* !queue.offer(1) | |
* val one = !queue.tryPeek() | |
* val one2 = !queue.tryPeek() | |
* !effect { println("none: $none, one $one, one2: $one2") } | |
* }.suspended() | |
* //sampleEnd | |
* | |
* @see [peek] for a function that semantically blocks until a value becomes available. | |
* @see [tryTake] for a function that attempts to take a value from the [Queue] while removing it. | |
*/ | |
suspend fun tryPeek(): A? | |
/** | |
* Immediately returns all available values in the [Queue], and empties the [Queue]. | |
* It returns an [emptyList] when no values are available. | |
* | |
* import arrow.fx.* | |
* import arrow.fx.extensions.fx | |
* | |
* //sampleStart | |
* suspend fun main(args: Array<String>): Unit = IO.fx { | |
* val queue = !Queue.unbounded<Int>() | |
* !queue.offerAll(1, 2, 3, 4) | |
* val values = !queue.takeAll() | |
* val empty = !queue.takeAll() | |
* !effect { println("values: $values, empty: $empty") } | |
* }.suspended() | |
* //sampleEnd | |
* | |
* For a [BackpressureStrategy.Bounded], this also includes all blocking offers that are waiting to be added in the [Queue]. | |
* | |
* @see [peekAll] for a function that doesn't remove the values from the [Queue]. | |
*/ | |
suspend fun takeAll(): List<A> | |
/** | |
* Immediately returns all available values in the [Queue], without empty'ing the [Queue]. | |
* It returns an [emptyList] when no values are available. | |
* | |
* import arrow.fx.* | |
* import arrow.fx.extensions.fx | |
* | |
* //sampleStart | |
* suspend fun main(args: Array<String>): Unit = IO.fx { | |
* val queue = !Queue.unbounded<Int>() | |
* !queue.offerAll(1, 2, 3, 4) | |
* val values = !queue.peekAll() | |
* val values2 = !queue.peekAll() | |
* !effect { println("values: $values, values2: values2") } | |
* }.suspended() | |
* //sampleEnd | |
* | |
* For a [BackpressureStrategy.Bounded], this also includes all blocking offers that are waiting to be added in the [Queue]. | |
* | |
* @see [takeAll] for a function that also removes all values from the [Queue]. | |
*/ | |
suspend fun peekAll(): List<A> | |
} | |
/** | |
* [Enqueue] allows offering values to a [Queue], but doesn't allow taking values from the [Queue]. | |
* You can use [Enqueue] to restrict certain functions or layers of your applications to only produce values. | |
* | |
* ```kotlin:ank:playground | |
* import arrow.fx.* | |
* import arrow.fx.extensions.* | |
* import arrow.fx.typeclasses.* | |
* import kotlin.coroutines.EmptyCoroutineContext | |
* | |
* //sampleStart | |
* suspend fun main(args: Array<String>): Unit = IO.fx { | |
* fun produceInts(e: Enqueue<ForIO, Int>, max: Int): IOOf<Unit> = | |
* (0..max).parTraverse(EmptyCoroutineContext) { i -> | |
* IO.sleep(i * 10.milliseconds).followedBy(e.offer(i)) | |
* }.void() | |
* | |
* val queue = !Queue.unbounded<Int>() | |
* !produceInts(queue, 1000).fork() | |
* !IO.sleep(4.seconds) | |
* val res = !queue.takeAll() | |
* !effect { println(res) } | |
* }.suspended() | |
* //sampleEnd | |
* ``` | |
* | |
* @see Queue in the case your functions or layers are allowed to take and offer. | |
* @see Dequeue in the case your functions or layers are only allowed to peek or take values. | |
* */ | |
interface Enqueue<A> { | |
/** | |
* Offers a value to the [Queue], and behaves differently depending on the [Queue.BackpressureStrategy]. | |
* | |
* - Semantically blocks until room available in [Queue], check [Queue.bounded] for an example. | |
* | |
* - Returns immediately and slides values through the [Queue], check [Queue.sliding] for an example. | |
* | |
* - Returns immediately and drops values from the [Queue], check [Queue.dropping] for an example. | |
* | |
* - Returns immediately and always offers to the [Queue], check [Queue.unbounded] for an example. | |
* | |
* @see [tryOffer] for a [Queue] that always returns immediately, and returns [true] if the value was succesfully put into the [Queue]. | |
*/ | |
suspend fun offer(a: A): Unit | |
/** | |
* Tries to offer a value to the [Queue], it ignores the [Queue.BackpressureStrategy] | |
* and returns false if the [Queue.BackpressureStrategy] does not have room for the value. | |
* | |
* Use [tryOffer] if you do not want to block or lose a value and return immediately. | |
*/ | |
suspend fun tryOffer(a: A): Boolean | |
suspend fun tryOfferAll(a: Iterable<A>): Boolean | |
suspend fun tryOfferAll(vararg a: A): Boolean = | |
tryOfferAll(a.toList()) | |
suspend fun offerAll(a: Iterable<A>): Unit | |
suspend fun offerAll(vararg a: A): Unit = | |
offerAll(a.toList()) | |
} | |
/** | |
* Lightweight [Concurrent] [F] [Queue] for values of [A]. | |
* | |
* A [Queue] can be used using 4 different back-pressure strategies: | |
* | |
* - [bounded]: Offering to a bounded queue at capacity will cause the fiber making | |
* the call to be suspended until the queue has space to receive the offer value | |
* | |
* - [dropping]: Offering to a dropping queue at capacity will cause the offered | |
* value to be discarded | |
* | |
* - [sliding]: Offering to a sliding queue at capacity will cause the value at the | |
* front of the queue to be discarded to make room for the offered value | |
* | |
* - [unbounded]: An unbounded queue has no notion of capacity and is bound only by | |
* exhausting the memory limits of the runtime | |
*/ | |
interface Queue<A> : Dequeue<A>, Enqueue<A> { | |
/** | |
* Immediately returns the current size of values in the [Queue]. | |
* Can be a negative number when there are takers but no values available. | |
*/ | |
suspend fun size(): Int | |
companion object { | |
private fun ensureCapacity(capacity: Int): Int = | |
require(capacity > 0) { "Queue must have a capacity greater than 0" } | |
.let { capacity } | |
/** | |
* Create a [Queue] with [BackpressureStrategy.Bounded]. | |
* | |
* Offering to a bounded queue at capacity will cause the fiber making | |
* the call to be suspended until the queue has space to receive the offered value. | |
* | |
* ```kotlin:ank:playground | |
* import arrow.fx.* | |
* import arrow.fx.extensions.fx | |
* import arrow.fx.extensions.io.concurrent.concurrent | |
* | |
* suspend fun main(args: Array<String>): Unit = IO.fx { | |
* val capacity = 2 | |
* val q = !Queue.bounded<Int>(capacity) | |
* !q.offer(42) | |
* !q.offer(43) | |
* !q.offer(44).fork() // <-- This `offer` exceeds the capacity and will be suspended | |
* val fortyTwo = !q.take() | |
* val fortyThree = !q.take() | |
* val fortyFour = !q.take() | |
* !effect { println(listOf(fortyTwo, fortyThree, fortyFour)) } | |
* }.suspended() | |
* ``` | |
*/ | |
suspend fun <A> bounded(capacity: Int): Queue<A> = | |
SuspendingQueue<A>(Bounded(ensureCapacity(capacity)), SuspendingQueue.State.empty()) | |
/** | |
* Create a [Queue] with [BackpressureStrategy.Sliding]. | |
* | |
* Offering to a sliding queue at capacity will cause the value at the | |
* front of the queue to be discarded to make room for the offered value. | |
* | |
* ```kotlin:ank:playground | |
* import arrow.fx.* | |
* import arrow.fx.extensions.fx | |
* import arrow.fx.extensions.io.concurrent.concurrent | |
* | |
* suspend fun main(args: Array<String>): Unit = IO.fx { | |
* val capacity = 2 | |
* val q = !Queue.sliding<ForIO, Int>(capacity, IO.concurrent()) | |
* !q.offer(42) | |
* !q.offer(43) | |
* !q.offer(44) // <-- This `offer` exceeds the capacity, causing the oldest value to be removed | |
* val fortyThree = !q.take() | |
* val fortyFour = !q.take() | |
* !q.offer(45) | |
* val fortyFive = !q.take() | |
* !effect { println(listOf(fortyThree, fortyFour, fortyFive)) } | |
* }.suspended() | |
* ``` | |
*/ | |
fun <A> sliding(capacity: Int): Queue<A> = | |
SuspendingQueue<A>(Sliding(ensureCapacity(capacity)), SuspendingQueue.State.empty()) | |
/** | |
* Create a [Queue] with [BackpressureStrategy.Dropping]. | |
* | |
* Offering to a dropping queue at capacity will cause the offered value to be discarded. | |
* | |
* ```kotlin:ank:playground | |
* import arrow.fx.* | |
* import arrow.fx.extensions.fx | |
* import arrow.fx.extensions.io.concurrent.concurrent | |
* | |
* suspend fun main(args: Array<String>): Unit = IO.fx { | |
* val capacity = 2 | |
* val q = !Queue.dropping<ForIO, Int>(capacity, IO.concurrent()) | |
* !q.offer(42) | |
* !q.offer(43) | |
* !q.offer(44) // <-- This `offer` exceeds the capacity and will be dropped immediately | |
* val fortyTwo = !q.take() | |
* val fortyThree = !q.take() | |
* !q.offer(45) | |
* val fortyFive = !q.take() | |
* !effect { println(listOf(fortyTwo, fortyThree, fortyFive)) } | |
* }.suspended() | |
* ``` | |
*/ | |
suspend fun <A> dropping(capacity: Int): Queue<A> = | |
SuspendingQueue<A>(Dropping(ensureCapacity(capacity)), SuspendingQueue.State.empty()) | |
/** | |
* Create a [Queue] with [BackpressureStrategy.Unbounded]. | |
* | |
* An unbounded queue has no notion of capacity and is bound only by exhausting the memory limits of the runtime. | |
* | |
* ```kotlin:ank:playground | |
* import arrow.fx.* | |
* import arrow.fx.extensions.fx | |
* import arrow.fx.extensions.io.concurrent.concurrent | |
* | |
* suspend fun main(args: Array<String>): Unit = IO.fx { | |
* val q = !Queue.unbounded<ForIO, Int>(IO.concurrent()) | |
* !q.offer(42) | |
* // ... | |
* !q.offer(42000000) | |
* val res = !q.take() | |
* !effect { println(res) } | |
* }.suspended() | |
* ``` | |
*/ | |
suspend fun <A> unbounded(): Queue<A> = | |
SuspendingQueue<A>(Unbounded, SuspendingQueue.State.empty()) | |
} | |
/** Internal model that represent the Queue strategies **/ | |
sealed class BackpressureStrategy { | |
data class Bounded(val capacity: Int) : BackpressureStrategy() | |
data class Sliding(val capacity: Int) : BackpressureStrategy() | |
data class Dropping(val capacity: Int) : BackpressureStrategy() | |
object Unbounded : BackpressureStrategy() | |
} | |
} | |
private typealias Callback<A> = (Result<A>) -> Unit | |
internal class SuspendingQueue<A> internal constructor(private val strategy: BackpressureStrategy, initial: State<A>?) : | |
Queue<A> { | |
private val state: AtomicRefW<State<A>> = AtomicRefW(initial ?: State.empty()) | |
override suspend fun size(): Int = | |
when (val curr = state.value) { | |
is State.Deficit -> -curr.takes.size | |
is State.Surplus -> 1 + curr.offers.size | |
} | |
override suspend fun tryOffer(a: A): Boolean = | |
unsafeTryOffer(a) | |
override suspend fun offer(a: A): Unit { | |
val didPut = unsafeTryOffer(a, false) | |
return if (didPut) Unit else cancellableF { cb -> unsafeOffer(a, cb) } | |
} | |
override suspend fun offerAll(a: Iterable<A>): Unit { | |
val didPut = unsafeTryOfferAll(a, false) | |
return if (didPut) Unit else cancellableF { cb -> unsafeOfferAll(a, cb) } | |
} | |
override suspend fun tryOfferAll(a: Iterable<A>): Boolean = | |
unsafeTryOfferAll(a) | |
override suspend fun tryTake(): A? = | |
when (val a = unsafeTryTake()) { | |
Maybe.None -> null | |
is Maybe.Just -> a.value | |
} | |
override suspend fun take(): A = | |
when (val a = unsafeTryTake()) { | |
Maybe.None -> cancellableF(::unsafeTake) | |
is Maybe.Just -> a.value | |
} | |
override suspend fun takeAll(): List<A> = | |
unsafeTakeAll() | |
override suspend fun tryPeek(): A? = | |
when (val s = unsafeTryPeek()) { | |
Maybe.None -> null | |
is Maybe.Just -> s.value | |
} | |
override suspend fun peek(): A = | |
when (val s = unsafeTryPeek()) { | |
Maybe.None -> cancellableF(::unsafePeek) | |
is Maybe.Just -> s.value | |
} | |
override suspend fun peekAll(): List<A> = | |
unsafePeekAll() | |
private tailrec suspend fun unsafeTryOffer(a: A, tryStrategy: Boolean = true): Boolean = | |
when (val current = state.value) { | |
is State.Surplus -> | |
unsafeOfferAllSurplusForStrategy(listOf(a), tryStrategy, current) ?: unsafeTryOffer(a, tryStrategy) | |
is State.Deficit -> { | |
val taker: Callback<A>? = current.takes.values.firstOrNull() | |
val update: State<A> = | |
if (taker == null) { // If no takers or values, we can safely store a single value in Surplus. | |
State.Surplus(IQueue(a), emptyMap()) | |
} else { // Else we need to remove the first taker from the current state | |
val rest = current.takes.entries.drop(1) | |
if (rest.isEmpty()) current.copy(emptyMap(), emptyMap()) | |
else State.Deficit(emptyMap(), rest.toMap()) | |
} | |
if (!state.compareAndSet(current, update)) { | |
unsafeTryOffer(a, tryStrategy) // Update failed, recurse | |
} else if (taker != null || current.peeks.isNotEmpty()) { | |
callTakeAndPeeks(a, taker, current.peeks.values) // Update succeeded, call callbacks with value | |
} else true // Update succeeded, no callbacks need to be called. | |
} | |
} | |
private tailrec suspend fun unsafeTryOfferAll(aas: Iterable<A>, tryStrategy: Boolean = true): Boolean = | |
when (val current = state.value) { | |
is State.Surplus -> | |
unsafeOfferAllSurplusForStrategy(aas, tryStrategy, current) ?: unsafeTryOfferAll(aas, tryStrategy) | |
is State.Deficit -> { | |
if (aas.count() > current.takes.values.size) { | |
unsafeOfferAllDecifitForStrategy(aas, tryStrategy, current) ?: unsafeTryOfferAll(aas, tryStrategy) | |
} else { | |
val update = | |
if (aas.count() == current.takes.values.size) State.empty<A>() | |
else current.copy(peeks = emptyMap(), takes = current.takes.entries.drop(aas.count()).toMap()) | |
if (!state.compareAndSet(current, update)) { | |
unsafeTryOfferAll(aas, tryStrategy) | |
} else if (current.takes.isNotEmpty() || current.peeks.isNotEmpty()) { | |
streamAllPeeksAndTakers(aas, current.peeks.values, current.takes.values) | |
} else true | |
} | |
} | |
} | |
private tailrec suspend fun unsafeOffer(a: A, onPut: Callback<Unit>): CancelToken = | |
when (val current = state.value) { | |
is State.Surplus -> { | |
val id = Token() | |
val newMap = current.offers + Pair(id, Pair(a, onPut)) | |
if (state.compareAndSet(current, State.Surplus(current.values, newMap))) CancelToken { | |
unsafeCancelOffer( | |
listOf( | |
id | |
) | |
) | |
} | |
else unsafeOffer(a, onPut) | |
} | |
is State.Deficit -> { | |
val first = current.takes.values.firstOrNull() | |
val update: State<A> = | |
if (current.takes.isEmpty()) { | |
State.Surplus(IQueue(a), emptyMap()) | |
} else { | |
val rest = current.takes.entries.drop(1) | |
if (rest.isEmpty()) current.copy(emptyMap(), emptyMap()) | |
else State.Deficit(emptyMap(), rest.toMap()) | |
} | |
if (state.compareAndSet(current, update)) { | |
if (first != null || current.peeks.isNotEmpty()) { | |
callTakeAndPeeks(a, first, current.peeks.values) | |
onPut(Result.success(Unit)) | |
CancelToken.unit | |
} else { | |
onPut(Result.success(Unit)) | |
CancelToken.unit | |
} | |
} else unsafeOffer(a, onPut) | |
} | |
} | |
private tailrec suspend fun unsafeOfferAll(a: Iterable<A>, onPut: Callback<Unit>): CancelToken = | |
when (val current = state.value) { | |
is State.Surplus -> { | |
val tokens = (1..a.count()).map { Token() } | |
val callbacks = (1..a.count()).map { noOpCallback } | |
val tupled: List<Pair<A, Callback<Unit>>> = a.zip(callbacks) | |
val id = Token() | |
val lastOffer = Pair(id, Pair(a.last(), onPut)) // When last value is offered, than offerAll completes. | |
val newOffers = tokens.zip(tupled, ::Pair) + lastOffer | |
val newMap = current.offers + newOffers | |
val update: State<A> = State.Surplus(current.values, newMap) | |
if (state.compareAndSet(current, update)) CancelToken { unsafeCancelOffer(tokens + id) } | |
else unsafeOfferAll(a, onPut) | |
} | |
is State.Deficit -> { | |
val capacity = (strategy as? Bounded)?.capacity ?: 0 | |
if (a.count() > current.takes.size && (a.count() - current.takes.size) > capacity) { | |
val takerValues = a.take(current.takes.size) | |
val leftOver = a.drop(current.takes.size) | |
val values = leftOver.take(capacity) | |
val needOffered = leftOver.drop(capacity) | |
val tokens = (1..needOffered.size).map { Token() } | |
val callbacks = (1..needOffered.size).map { noOpCallback } | |
val tupled: List<Pair<A, Callback<Unit>>> = needOffered.zip(callbacks) | |
val id = Token() | |
val lastOffer = | |
Pair(id, Pair(needOffered.last(), onPut)) // When last value is offered, than offerAll completes. | |
val newOffers = tokens.zip(tupled, ::Pair) + lastOffer | |
val update = State.Surplus<A>(IQueue(values), newOffers.toMap()) | |
if (state.compareAndSet(current, update)) { | |
streamAllPeeksAndTakers(takerValues, current.peeks.values, current.takes.values) | |
CancelToken.unit | |
} else unsafeOfferAll(a, onPut) // recurse | |
} else { | |
val update = State.Surplus<A>(IQueue(a.drop(current.takes.size)), emptyMap()) | |
if (state.compareAndSet(current, update)) { | |
streamAllPeeksAndTakers(a, current.peeks.values, current.takes.values) | |
onPut(Result.success(Unit)) | |
CancelToken.unit | |
} else unsafeOfferAll(a, onPut) // recurse | |
} | |
} | |
} | |
private tailrec fun unsafeCancelOffer(ids: List<Token>): Unit = | |
when (val current = state.value) { | |
is State.Surplus -> { | |
val update = current.copy(offers = current.offers - ids) | |
if (state.compareAndSet(current, update)) Unit | |
else unsafeCancelOffer(ids) | |
} | |
is State.Deficit -> Unit | |
} | |
private tailrec suspend fun unsafeTryTake(): Maybe<A> = | |
when (val current = state.value) { | |
is State.Surplus -> { | |
val (head, tail) = current.values.dequeue() | |
if (current.offers.isEmpty()) { | |
val update: State<A> = | |
if (tail.isEmpty()) State.empty<A>() | |
else current.copy(values = tail) | |
if (state.compareAndSet(current, update)) Maybe.Just(head) | |
else unsafeTryTake() | |
} else { | |
val (ax, notify) = current.offers.values.first() | |
val xs = current.offers.entries.drop(1) | |
val update: State<A> = State.Surplus(tail.enqueue(ax), xs.toMap()) | |
if (state.compareAndSet(current, update)) { | |
notify(Result.success(Unit)) // Notify offer that it finished | |
Maybe.Just(head) | |
} else unsafeTryTake() // compareAndSet failed try again | |
} | |
} | |
is State.Deficit -> Maybe.None | |
} | |
private tailrec suspend fun unsafeTake(onTake: Callback<A>): CancelToken = | |
when (val current = state.value) { | |
is State.Surplus -> { | |
val (head, tail) = current.values.dequeue() | |
if (current.offers.isEmpty()) { | |
val update: State<A> = | |
if (tail.isEmpty()) State.empty<A>() | |
else current.copy(values = tail) | |
if (state.compareAndSet(current, update)) { | |
onTake(Result.success(head)) // Call takers callback with value | |
CancelToken.unit | |
} else { | |
unsafeTake(onTake) // Update failed, recurse | |
} | |
} else { | |
val (ax, notify) = current.offers.values.first() | |
val xs = current.offers.entries.drop(0) | |
val update: State<A> = State.Surplus(tail.enqueue(ax), xs.toMap()) | |
if (state.compareAndSet(current, update)) { | |
notify(Result.success(Unit)) // Notify offer that it finished, | |
onTake(Result.success(head)) // Call takers callback with value | |
CancelToken.unit // Nothing to cancel | |
} else unsafeTake(onTake) // Update failed, recurse | |
} | |
} | |
is State.Deficit -> { | |
val id = Token() | |
val newQueue = current.takes + Pair(id, onTake) | |
val update: State<A> = State.Deficit(current.peeks, newQueue) | |
if (state.compareAndSet(current, update)) CancelToken { unsafeCancelTake(id) } | |
else unsafeTake(onTake) | |
} | |
} | |
private tailrec fun unsafeCancelTake(id: Token): Unit = | |
when (val current = state.value) { | |
is State.Deficit -> { | |
val newMap = current.takes - id | |
val update = State.Deficit<A>(current.peeks, newMap) | |
if (state.compareAndSet(current, update)) Unit | |
else unsafeCancelTake(id) | |
} | |
is State.Surplus -> Unit | |
} | |
private tailrec suspend fun unsafeTakeAll(): List<A> = | |
when (val current = state.value) { | |
is State.Surplus -> { | |
val all = current.values.toList() | |
if (current.offers.isEmpty()) { | |
val update: State<A> = State.empty<A>() | |
if (state.compareAndSet(current, update)) all | |
else unsafeTakeAll() | |
} else { | |
val allValues = current.offers.values.map { it.first } | |
val allOffers = current.offers.values.map { it.second } | |
val update = State.empty<A>() | |
// Call all outstanding offer calls that they're finished, and return all available + offered values. | |
if (state.compareAndSet(current, update)) { | |
allOffers.forEach { cb -> cb(Result.success(Unit)) } | |
all + allValues | |
} else unsafeTakeAll() | |
} | |
} | |
is State.Deficit -> emptyList() | |
} | |
private suspend fun unsafePeekAll(): List<A> = | |
when (val current = state.value) { | |
is State.Deficit -> emptyList() | |
is State.Surplus -> { | |
val all = current.values.toList() | |
val allOffered = current.offers.values.map { it.first } | |
all + allOffered | |
} | |
} | |
private suspend fun unsafeTryPeek(): Maybe<A> = | |
when (val current = state.value) { | |
is State.Surplus -> Maybe.Just(current.values.first()) | |
is State.Deficit -> Maybe.None | |
} | |
private suspend tailrec fun unsafePeek(onPeek: Callback<A>): CancelToken = | |
when (val current = state.value) { | |
is State.Surplus -> { | |
onPeek(Result.success(current.values.first())) | |
CancelToken.unit | |
} | |
is State.Deficit -> { | |
val id = Token() | |
val newReads = current.peeks + Pair(id, onPeek) | |
val update: State<A> = State.Deficit(newReads, current.takes) | |
if (state.compareAndSet(current, update)) CancelToken { unsafeCancelRead(id) } | |
else unsafePeek(onPeek) | |
} | |
} | |
private tailrec fun unsafeCancelRead(id: Token): Unit = | |
when (val current = state.value) { | |
is State.Deficit -> { | |
val newMap = current.peeks - id | |
val update: State<A> = State.Deficit(newMap, current.takes) | |
if (state.compareAndSet(current, update)) Unit | |
else unsafeCancelRead(id) | |
} | |
is State.Surplus -> Unit | |
} | |
/** | |
* A Queue can be in three states | |
* [Deficit]: Contains three maps of registered id & take/reads/shutdown callbacks waiting | |
* for a value to become available. | |
* | |
* [Surplus]: Contains a queue of values and two maps of registered id & offer/shutdown callbacks waiting to | |
* offer once there is room (if the queue is bounded, dropping or sliding). | |
*/ | |
sealed class State<out A> { | |
data class Deficit<A>( | |
val peeks: Map<Token, Callback<A>>, | |
val takes: Map<Token, Callback<A>> | |
) : State<A>() | |
data class Surplus<A>( | |
val values: IQueue<A>, | |
val offers: Map<Token, Pair<A, Callback<Unit>>> | |
) : State<A>() | |
companion object { | |
private val empty: Deficit<Any?> = Deficit(emptyMap(), emptyMap()) | |
fun <A> empty(): Deficit<A> = empty as Deficit<A> | |
} | |
} | |
/** | |
* Unsafely handle offer at State.Surplus. | |
* | |
* Return: | |
* - `true` when handled | |
* - `false` when offer should be scheduled (only used for bounded). | |
* or when [tryStrategy] is true to signal no room in the [Queue]. | |
* - null when needs to recurse and try again | |
*/ | |
private suspend fun unsafeOfferAllSurplusForStrategy( | |
a: Iterable<A>, | |
tryStrategy: Boolean, | |
surplus: State.Surplus<A> | |
): Boolean? = | |
when (strategy) { | |
is Bounded -> | |
when { | |
surplus.values.length() + a.count() > strategy.capacity -> false | |
state.compareAndSet(surplus, surplus.copy(surplus.values.enqueue(a))) -> true | |
else -> null | |
} | |
is Sliding -> { | |
val nextQueue = if (surplus.values.length() + (a.count() - 1) < strategy.capacity) surplus.values.enqueue(a) | |
else surplus.values.drop(a.count()).enqueue(a) | |
when { | |
surplus.values.length() >= strategy.capacity && tryStrategy -> false | |
state.compareAndSet(surplus, surplus.copy(values = nextQueue)) -> true | |
else -> null | |
} | |
} | |
is Dropping -> { | |
val nextQueue = if (surplus.values.length() + (a.count() - 1) < strategy.capacity) surplus.values.enqueue(a) | |
else surplus.values | |
when { | |
surplus.values.length() >= strategy.capacity && tryStrategy -> false | |
state.compareAndSet(surplus, surplus.copy(values = nextQueue)) -> true | |
else -> null | |
} | |
} | |
is Unbounded -> | |
if (!state.compareAndSet(surplus, surplus.copy(values = surplus.values.enqueue(a)))) null | |
else true | |
} | |
private suspend fun unsafeOfferAllDecifitForStrategy( | |
a: Iterable<A>, | |
tryStrategy: Boolean, | |
deficit: State.Deficit<A> | |
): Boolean? = | |
when (strategy) { | |
is Bounded -> | |
when { | |
// We need to atomically offer to deficit AND schedule offers according to capacity. | |
a.count() > deficit.takes.size && (a.count() - deficit.takes.size) > strategy.capacity -> false | |
a.count() > deficit.takes.size -> { // Offer to deficit and update to Surplus, capacity check above | |
val update = State.Surplus<A>(IQueue(a.drop(deficit.takes.size)), emptyMap()) | |
if (state.compareAndSet(deficit, update)) streamAllPeeksAndTakers( | |
a, | |
deficit.peeks.values, | |
deficit.takes.values | |
) | |
else null // recurse | |
} | |
else -> null // updates failed, recurse | |
} | |
is Sliding -> { | |
val afterTakers = a.count() - deficit.takes.size | |
if (afterTakers > strategy.capacity && tryStrategy) false | |
else { | |
val newQueue = if (afterTakers > strategy.capacity) IQueue( | |
a.drop(deficit.takes.size).drop(afterTakers - strategy.capacity) | |
) | |
else IQueue(a.drop(deficit.takes.size)) | |
val update = State.Surplus<A>(newQueue, emptyMap()) | |
if (state.compareAndSet(deficit, update)) streamAllPeeksAndTakers( | |
a, | |
deficit.peeks.values, | |
deficit.takes.values | |
) | |
else null // recurse | |
} | |
} | |
is Dropping -> { | |
val afterTakers = a.count() - deficit.takes.size | |
if (afterTakers > strategy.capacity && tryStrategy) false | |
else { | |
val newQueue = if (afterTakers > strategy.capacity) IQueue(a.drop(deficit.takes.size).take(strategy.capacity)) | |
else IQueue(a.drop(deficit.takes.size)) | |
val update = State.Surplus<A>(newQueue, emptyMap()) | |
if (state.compareAndSet(deficit, update)) streamAllPeeksAndTakers( | |
a, | |
deficit.peeks.values, | |
deficit.takes.values | |
) | |
else null // recurse | |
} | |
} | |
is Unbounded -> { | |
val update = State.Surplus<A>(IQueue(a.drop(deficit.takes.size)), emptyMap()) | |
if (state.compareAndSet(deficit, update)) streamAllPeeksAndTakers(a, deficit.peeks.values, deficit.takes.values) | |
else null | |
} | |
} | |
private suspend fun streamAllPeeksAndTakers( | |
a: Iterable<A>, | |
peeks: Iterable<Callback<A>>, | |
takes: Iterable<Callback<A>> | |
): Boolean { | |
a.firstOrNull()?.let { aa -> | |
val res = Result.success(aa) | |
peeks.forEach { cb -> cb(res) } | |
} | |
a.zip(takes.take(a.count())) { aa, cb -> | |
cb(Result.success(aa)) | |
} | |
return true | |
} | |
private suspend fun callTakeAndPeeks(a: A, take: Callback<A>?, peeks: Iterable<Callback<A>>): Boolean { | |
val value = Result.success(a) | |
peeks.callAll(value) | |
if (take != null) take(value) | |
return true | |
} | |
// For streaming a value to a whole `reads` collection | |
private suspend fun Iterable<Callback<A>>.callAll(value: Result<A>): Unit = | |
forEach { cb -> cb(value) } | |
private val noOpCallback: Callback<Unit> = | |
{ Unit } | |
companion object { | |
suspend fun <F, A> empty(): SuspendingQueue<A> = | |
SuspendingQueue(Unbounded, null) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment