Skip to content

Instantly share code, notes, and snippets.

@matthughes
Created December 11, 2015 22:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save matthughes/273a5ddf813ca382b697 to your computer and use it in GitHub Desktop.
Save matthughes/273a5ddf813ca382b697 to your computer and use it in GitHub Desktop.
object PaulRetry {
import scalaz.stream.{ async, Process }
import scalaz.concurrent.Task
import scala.concurrent.duration._
implicit val scheduler = scalaz.stream.DefaultScheduler
/**
* Try running the process `p`, retrying in the event of failure.
* Example: `retry(Process.awakeEvery(2 minutes))(p)` will wait
* 2 minutes after each failure before trying again, indefinitely.
* Using `retry(Process.awakeEvery(2 minutes).take(5))(p)` will do
* the same, but only retry a total of five times before raising
* the latest error.
*/
def retry[A](retries: Process[Task, Any])(p: Process[Task, A]): Process[Task, A] = {
val alive = async.signalOf[Unit](())
val step: Process[Task, Throwable \/ A] =
p.append(Process.eval_(alive.close)).attempt()
step.stripW ++ link(alive.continuous)(retries).terminated.flatMap {
// on our last reconnect attempt, rethrow error
case None => step.flatMap(_.fold(Process.fail, Process.emit))
// on other attempts, ignore the exceptions
case Some(_) => step.stripW
}
}
/** Terminate `p` when the given signal `alive` terminates. */
def link[A](alive: Process[Task, Unit])(p: Process[Task, A]): Process[Task, A] =
alive.zip(p).map(_._2)
def runProducesDoubleOutput(): Unit = {
retry(time.awakeEvery(1.second).take(5))(Process.eval(Task {
println("Running 1")
1
})).run.run
}
// scala> com.ccadllc.ipdc.terminaltopology.PaulRetry.runProducesDoubleOutput
// Running 1
// Running 1
def runProducesSingleOutput(): Unit = {
retry(time.awakeEvery(1.second).take(5))(Process(1).map { x =>
println(s"Running $x")
x
}).run.run
}
// scala> com.ccadllc.ipdc.terminaltopology.PaulRetry.runProducesSingleOutput
// Running 1
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment