import monix.eval.Task | |
import monix.reactive._ | |
import scala.concurrent.Await | |
import scala.concurrent.duration.Duration | |
import scala.language.postfixOps | |
import scala.util.Random | |
import scala.concurrent.duration._ | |
def retryOnFailure(times: Int, source: Task[Int]): Task[Int] = | |
source | |
.flatMap(p => if (Random.nextInt(10) < 8) Task.raiseError(new Exception) else Task(p)) | |
.onErrorHandleWith { err => | |
if (times <= 0) Task.raiseError(err) else { | |
println("retrying") | |
retryOnFailure(times - 1, source).delayExecution(1 second) | |
} | |
} | |
val f = Observable.fromIterable(0 to 5) | |
.mapEval(p => Task.now{println(s"Processing $p");p}) | |
.delayExecution(1 second) | |
.mapEval(p => retryOnFailure(3, Task(p)).attempt) | |
.collect { | |
case Right(evt) => evt | |
case Left(_) => println("Number of attempts exceeded, irrecoverable") | |
} | |
.completedL | |
.runToFuture | |
Await.ready(f, Duration.Inf) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment