Skip to content

Instantly share code, notes, and snippets.

@richard-gibson
Created October 20, 2019 19:51
Show Gist options
  • Save richard-gibson/4bb12fac9fcd8179bf4e485fa006b16d to your computer and use it in GitHub Desktop.
Save richard-gibson/4bb12fac9fcd8179bf4e485fa006b16d to your computer and use it in GitHub Desktop.
Example of multiple producers using a single Arrow Effects Queue
import arrow.Kind
import arrow.core.Either
import arrow.fx.IO
import arrow.fx.ForIO
import arrow.fx.Queue
import arrow.fx.Timer
import arrow.fx.fix
import arrow.fx.handleErrorWith
import arrow.fx.QueueShutdown
import arrow.fx.extensions.fx
import arrow.fx.extensions.io.concurrent.concurrent
import arrow.fx.typeclasses.Concurrent
import arrow.fx.typeclasses.Duration
import arrow.fx.typeclasses.milliseconds
import arrow.fx.typeclasses.seconds
/**
* Example usage of bounded queue built using Arrow Ref and Promise
* Takers suspended when nothing to consume from Queue
* Producers suspended when queue capacity reached
*/
fun main() {
multiProducerMultiConsumer
.followedBy(multiProducerMultiConsumerShutdown)
.unsafeRunSync()
}
// Output
// --- multiProducerMultiConsumer ---
// consumer 1 taking 7
// consumer 2 taking 10
// producer 1 offering: 10
// producer 3 offering: 7
// consumer 1 taking 5
// producer 2 offering: 5
// consumer 2 taking 7
// producer 3 offering: 7
// producer 1 offering: 10
// consumer 1 taking 10
// cancelling fo1
// cancelling fo2
// cancelling fo3
// cancelling fc1
// cancelling fc2
// --- multiProducerMultiConsumerShutdown ---
// consumer 1 taking 10
// consumer 2 taking 7
// producer 1 offering: 10
// producer 3 offering: 7
// consumer 1 taking 5
// consumer 2 taking 10
// producer 3 offering: 7
// producer 1 offering: 10
// producer 2 offering: 5
// consumer 1 taking 7
// consumer 2 taking 7
// producer 1 offering: 10
// producer 3 offering: 7
// consumer 1 taking 10
// Queue shut down
// Exception in thread "main" arrow.fx.QueueShutdown
fun <T> IO.Companion.boundedQueue(capacity: Int) =
Queue.bounded<ForIO, T>(capacity, IO.concurrent())
fun putStrLn(s: String) = IO.later { println(s) }
fun <F, A, B> Kind<F, A>.repeatEvery(duration: Duration, C: Concurrent<F>): Kind<F, B> = C.run {
val leftUnit = { _: A -> Either.Left(Unit) }
val stepResult: (Unit) -> Kind<F, Either<Unit, B>> = {
Timer(C).sleep(duration).flatMap {
this@repeatEvery.map(leftUnit)
}
}
tailRecM(Unit, stepResult)
}
fun <A, B> IO<A>.repeatEvery(duration: Duration) = repeatEvery<ForIO, A, B>(duration, IO.concurrent()).fix()
fun <A> offerAfter(latency: Duration, label: String, a: A, queue: Queue<ForIO, A>): IO<Unit> = IO.fx {
!queue.offer(a)
!putStrLn("$label: $a")
}.repeatEvery(latency)
fun <A> consumeAfter(latency: Duration, label: String, queue: Queue<ForIO, A>): IO<Unit> = IO.fx {
val m = !queue.take()
!putStrLn("$label $m")
}.repeatEvery(latency)
// need to write to return IO unit
fun <A> IO<A>.handleShutdown(): IO<A> =
handleErrorWith {
when (it) {
is QueueShutdown ->
putStrLn("Shutdown called").followedBy(this)
else -> IO.raiseError(it)
}
}
val multiProducerMultiConsumer = IO.fx {
!putStrLn("--- multiProducerMultiConsumer ---")
val context = dispatchers().default()
val queue = !IO.boundedQueue<Int>(10000)
val fo1 = !offerAfter(1.seconds, "producer 1 offering", 10, queue).fork(context)
val fo2 = !offerAfter(2.seconds, "producer 2 offering", 5, queue).fork(context)
val fo3 = !offerAfter(1.seconds, "producer 3 offering", 7, queue).fork(context)
val fc1 = !consumeAfter(500.milliseconds, "consumer 1 taking", queue).fork(context)
val fc2 = !consumeAfter(700.milliseconds, "consumer 2 taking", queue).fork(context)
!timer().sleep(3.seconds)
!putStrLn("cancelling fo1")
!fo1.cancel()
!putStrLn("cancelling fo2")
!fo2.cancel()
!putStrLn("cancelling fo3")
!fo3.cancel()
!putStrLn("cancelling fc1")
!fc1.cancel()
!putStrLn("cancelling fc2")
!fc2.cancel()
}
val multiProducerMultiConsumerShutdown = IO.fx {
!putStrLn("--- multiProducerMultiConsumerShutdown ---")
val context = dispatchers().default()
val queue = !IO.boundedQueue<Int>(10000)
val fo1 = !offerAfter(1.seconds, "producer 1 offering", 10, queue).fork(context).handleShutdown()
!offerAfter(2.seconds, "producer 2 offering", 5, queue).fork(context)
!offerAfter(1.seconds, "producer 3 offering", 7, queue).fork(context)
!consumeAfter(500.milliseconds, "consumer 1 taking", queue).fork(context)
!consumeAfter(700.milliseconds, "consumer 2 taking", queue).fork(context)
!timer().sleep(3.seconds)
!queue.shutdown()
!putStrLn("Queue shut down")
!fo1.join()
!putStrLn("After join")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment