Skip to content

Instantly share code, notes, and snippets.

Created April 11, 2020 06:45
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save inmyth/7e1647022370dca5b553a090ac4d2b94 to your computer and use it in GitHub Desktop.
import monix.eval.Task
import monix.reactive._
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.language.postfixOps
import scala.util.Random
import scala.concurrent.duration._
def retryOnFailure(times: Int, source: Task[Int]): Task[Int] =
.flatMap(p => if (Random.nextInt(10) < 8) Task.raiseError(new Exception) else Task(p))
.onErrorHandleWith { err =>
if (times <= 0) Task.raiseError(err) else {
retryOnFailure(times - 1, source).delayExecution(1 second)
val f = Observable.fromIterable(0 to 5)
.mapEval(p =>{println(s"Processing $p");p})
.delayExecution(1 second)
.mapEval(p => retryOnFailure(3, Task(p)).attempt)
.collect {
case Right(evt) => evt
case Left(_) => println("Number of attempts exceeded, irrecoverable")
Await.ready(f, Duration.Inf)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment