Skip to content

Instantly share code, notes, and snippets.

Created May 31, 2017 19:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/a2392aec161542cf231a332632017c87 to your computer and use it in GitHub Desktop.
Save anonymous/a2392aec161542cf231a332632017c87 to your computer and use it in GitHub Desktop.
the description for this gist
import monix.execution.Scheduler.Implicits.global
val N = 100000
val mvar = MVar.empty[Int]
//Puts 'n', 'n+1', ..., 'N-1' to 'mvar'
def produce(n: Int): Task[Unit] =
if (n < N)
mvar.put(n).flatMap(_ => produce(n + 1))
else
Task.now(())
//Takes 'N-c' values from 'mvar' and sums them. Fails if cannot take in 100 ms.
def consume(sum: Long, c: Int): Task[Long] =
if (c < N) {
//Fails if producer is not able to put value in approx. 100 ms
mvar.take
.timeout(100.millisecond)
.flatMap(v => consume(v + sum, c + 1))
} else {
Task.now(sum)
}
val consumer = consume(0, 0).runAsync
val producer = produce(0).runOnComplete {
case Success(_) => println("Producer finished.")
case Failure(ex) => println(s"Producer failed: $ex")
}
consumer.onComplete(r => println(s"Consumer result: $r"))
Await.result(consumer, 5.seconds)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment