import monix.eval.Task
import monix.reactive._
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.language.postfixOps
import scala.util.Random
import scala.concurrent.duration._
def retryOnFailure(times: Int, source: Task[Int]): Task[Int] =
.flatMap(p => if (Random.nextInt(10) < 8) Task.raiseError(new Exception) else Task(p))
.onErrorHandleWith { err =>
if (times <= 0) Task.raiseError(err) else {
retryOnFailure(times - 1, source).delayExecution(1 second)
val f = Observable.fromIterable(0 to 5)
.mapEval(p =>{println(s"Processing $p");p})
.delayExecution(1 second)
.mapEval(p => retryOnFailure(3, Task(p)).attempt)
.collect {
case Right(evt) => evt
case Left(_) => println("Number of attempts exceeded, irrecoverable")
Await.ready(f, Duration.Inf)
