Skip to content

Instantly share code, notes, and snippets.

@pchiusano
Last active December 30, 2015 22:29
Show Gist options
  • Save pchiusano/7894696 to your computer and use it in GitHub Desktop.
Save pchiusano/7894696 to your computer and use it in GitHub Desktop.
Combinator for retrying a `Process` multiple times, delaying between attempts
// FYI: some comments below refer to old version of this gist: https://gist.github.com/pchiusano/7894696/12201b92db57dff8ed6689fc55c15c3f1a136f86
package scalaz.stream
import scalaz.\/
import scalaz.concurrent.Task
object retries {
def dropWhileUnlessAtEnd[I](f: I => Boolean): Process1[I,I] = {
def go(prev: Option[I]): Process1[I,I] =
process1.awaitOption[I].flatMap {
// `prev` was the last element, emit it unconditionally
case None => Process.emitAll(prev.toList)
// `prev` wasn't the last element, emit it only if it tests false vs predicate
case some => Process.emitAll(prev.toList.dropWhile(f)) ++ go(some)
}
go(None)
}
def retry[A](schedule: Process[Task, Unit])(p: Process[Task, A]): Process[Task, A] = {
// step will have either left(err) (if failed) or right(None) (if succeeded) as its last element
val step: Process[Task, Throwable \/ Option[A]] = p.terminated.attempt()
val retries: Process[Task, Throwable \/ Option[A]] = schedule.flatMap(_ => step)
// if we get a `None` on the right, _.isDefined will be false, and we've had a successful attempt, so stop
retries.takeWhile(_.fold(_ => true, _.isDefined))
// all but last error is ignored
.pipe(dropWhileUnlessAtEnd(_.isLeft))
.flatMap(_.fold(Process.fail, o => Process.emitAll(o.toList)))
}
}
@matthughes
Copy link

Paul, it seems that in the successful case the process is repeated.

The None case happens either when 1) retries are exhausted or 2) the signal is exhausted. Signal gets exhausted when you are successful. But in the None case you repeat step. So there is the first step ++ step in None. If step represents a web service call, you're always making two even in the case of successful first try.

This is demonstrated in this gist (https://gist.github.com/matthughes/273a5ddf813ca382b697). Although, it seems to depend on how you construct the Process as one kind of process repeats and another mysteriously doesn't.

@pchiusano
Copy link
Author

Okay, I think this might do it:

  def dropWhileUnlessAtEnd[I](f: I => Boolean): Process1[I,I] = {
    def go(prev: Option[I]): Process1[I,I] =
      process1.awaitOption[I].flatMap {
        // `prev` was the last element, emit it unconditionally
        case None => Process.emitAll(prev.toList)
        // `prev` wasn't the last element, emit it only if it tests false vs predicate
        case some => Process.emitAll(prev.toList.dropWhile(f)) ++ go(some)
      }
    go(None)
  }

  def retry[A](schedule: Process[Task, Unit])(p: Process[Task, A]): Process[Task, A] = {
    val step: Process[Task, Throwable \/ Option[A]] = p.terminated.attempt()
    val retries: Process[Task, Throwable \/ Option[A]] = schedule.flatMap(_ => step)
    retries.takeWhile(_.fold(_ => true, _.isDefined))
           .pipe(dropWhileUnlessAtEnd(_.isLeft))
           .flatMap(_.fold(Process.fail, o => Process.emitAll(o.toList)))
  }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment