Created
October 20, 2019 19:51
-
-
Save richard-gibson/4bb12fac9fcd8179bf4e485fa006b16d to your computer and use it in GitHub Desktop.
Example of multiple producers using a single Arrow Effects Queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import arrow.Kind | |
import arrow.core.Either | |
import arrow.fx.IO | |
import arrow.fx.ForIO | |
import arrow.fx.Queue | |
import arrow.fx.Timer | |
import arrow.fx.fix | |
import arrow.fx.handleErrorWith | |
import arrow.fx.QueueShutdown | |
import arrow.fx.extensions.fx | |
import arrow.fx.extensions.io.concurrent.concurrent | |
import arrow.fx.typeclasses.Concurrent | |
import arrow.fx.typeclasses.Duration | |
import arrow.fx.typeclasses.milliseconds | |
import arrow.fx.typeclasses.seconds | |
/** | |
* Example usage of bounded queue built using Arrow Ref and Promise | |
* Takers suspended when nothing to consume from Queue | |
* Producers suspended when queue capacity reached | |
*/ | |
fun main() { | |
multiProducerMultiConsumer | |
.followedBy(multiProducerMultiConsumerShutdown) | |
.unsafeRunSync() | |
} | |
// Output | |
// --- multiProducerMultiConsumer --- | |
// consumer 1 taking 7 | |
// consumer 2 taking 10 | |
// producer 1 offering: 10 | |
// producer 3 offering: 7 | |
// consumer 1 taking 5 | |
// producer 2 offering: 5 | |
// consumer 2 taking 7 | |
// producer 3 offering: 7 | |
// producer 1 offering: 10 | |
// consumer 1 taking 10 | |
// cancelling fo1 | |
// cancelling fo2 | |
// cancelling fo3 | |
// cancelling fc1 | |
// cancelling fc2 | |
// --- multiProducerMultiConsumerShutdown --- | |
// consumer 1 taking 10 | |
// consumer 2 taking 7 | |
// producer 1 offering: 10 | |
// producer 3 offering: 7 | |
// consumer 1 taking 5 | |
// consumer 2 taking 10 | |
// producer 3 offering: 7 | |
// producer 1 offering: 10 | |
// producer 2 offering: 5 | |
// consumer 1 taking 7 | |
// consumer 2 taking 7 | |
// producer 1 offering: 10 | |
// producer 3 offering: 7 | |
// consumer 1 taking 10 | |
// Queue shut down | |
// Exception in thread "main" arrow.fx.QueueShutdown | |
fun <T> IO.Companion.boundedQueue(capacity: Int) = | |
Queue.bounded<ForIO, T>(capacity, IO.concurrent()) | |
fun putStrLn(s: String) = IO.later { println(s) } | |
fun <F, A, B> Kind<F, A>.repeatEvery(duration: Duration, C: Concurrent<F>): Kind<F, B> = C.run { | |
val leftUnit = { _: A -> Either.Left(Unit) } | |
val stepResult: (Unit) -> Kind<F, Either<Unit, B>> = { | |
Timer(C).sleep(duration).flatMap { | |
this@repeatEvery.map(leftUnit) | |
} | |
} | |
tailRecM(Unit, stepResult) | |
} | |
fun <A, B> IO<A>.repeatEvery(duration: Duration) = repeatEvery<ForIO, A, B>(duration, IO.concurrent()).fix() | |
fun <A> offerAfter(latency: Duration, label: String, a: A, queue: Queue<ForIO, A>): IO<Unit> = IO.fx { | |
!queue.offer(a) | |
!putStrLn("$label: $a") | |
}.repeatEvery(latency) | |
fun <A> consumeAfter(latency: Duration, label: String, queue: Queue<ForIO, A>): IO<Unit> = IO.fx { | |
val m = !queue.take() | |
!putStrLn("$label $m") | |
}.repeatEvery(latency) | |
// need to write to return IO unit | |
fun <A> IO<A>.handleShutdown(): IO<A> = | |
handleErrorWith { | |
when (it) { | |
is QueueShutdown -> | |
putStrLn("Shutdown called").followedBy(this) | |
else -> IO.raiseError(it) | |
} | |
} | |
val multiProducerMultiConsumer = IO.fx { | |
!putStrLn("--- multiProducerMultiConsumer ---") | |
val context = dispatchers().default() | |
val queue = !IO.boundedQueue<Int>(10000) | |
val fo1 = !offerAfter(1.seconds, "producer 1 offering", 10, queue).fork(context) | |
val fo2 = !offerAfter(2.seconds, "producer 2 offering", 5, queue).fork(context) | |
val fo3 = !offerAfter(1.seconds, "producer 3 offering", 7, queue).fork(context) | |
val fc1 = !consumeAfter(500.milliseconds, "consumer 1 taking", queue).fork(context) | |
val fc2 = !consumeAfter(700.milliseconds, "consumer 2 taking", queue).fork(context) | |
!timer().sleep(3.seconds) | |
!putStrLn("cancelling fo1") | |
!fo1.cancel() | |
!putStrLn("cancelling fo2") | |
!fo2.cancel() | |
!putStrLn("cancelling fo3") | |
!fo3.cancel() | |
!putStrLn("cancelling fc1") | |
!fc1.cancel() | |
!putStrLn("cancelling fc2") | |
!fc2.cancel() | |
} | |
val multiProducerMultiConsumerShutdown = IO.fx { | |
!putStrLn("--- multiProducerMultiConsumerShutdown ---") | |
val context = dispatchers().default() | |
val queue = !IO.boundedQueue<Int>(10000) | |
val fo1 = !offerAfter(1.seconds, "producer 1 offering", 10, queue).fork(context).handleShutdown() | |
!offerAfter(2.seconds, "producer 2 offering", 5, queue).fork(context) | |
!offerAfter(1.seconds, "producer 3 offering", 7, queue).fork(context) | |
!consumeAfter(500.milliseconds, "consumer 1 taking", queue).fork(context) | |
!consumeAfter(700.milliseconds, "consumer 2 taking", queue).fork(context) | |
!timer().sleep(3.seconds) | |
!queue.shutdown() | |
!putStrLn("Queue shut down") | |
!fo1.join() | |
!putStrLn("After join") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment