Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import monix.eval.Task
import monix.reactive._
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.language.postfixOps
import scala.util.Random
import scala.concurrent.duration._
def retryOnFailure(times: Int, source: Task[Int]): Task[Int] =
source
.flatMap(p => if (Random.nextInt(10) < 8) Task.raiseError(new Exception) else Task(p))
.onErrorHandleWith { err =>
if (times <= 0) Task.raiseError(err) else {
println("retrying")
retryOnFailure(times - 1, source).delayExecution(1 second)
}
}
val f = Observable.fromIterable(0 to 5)
.mapEval(p => Task.now{println(s"Processing $p");p})
.delayExecution(1 second)
.mapEval(p => retryOnFailure(3, Task(p)).attempt)
.collect {
case Right(evt) => evt
case Left(_) => println("Number of attempts exceeded, irrecoverable")
}
.completedL
.runToFuture
Await.ready(f, Duration.Inf)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.