Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@matthughes
Last active November 9, 2015 20:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save matthughes/0860e7d0dd05937cc6f9 to your computer and use it in GitHub Desktop.
Save matthughes/0860e7d0dd05937cc6f9 to your computer and use it in GitHub Desktop.
trait RetryCombinators {
// Taken from Paul's gist: https://gist.github.com/pchiusano/7894696
/**
* Try running the process `p`, retrying in the event of failure.
* Example: `retry(time.awakeEvery(2 minutes))(p)` will wait
* 2 minutes after each failure before trying again, indefinitely.
* Using `retry(Process.awakeEvery(2 minutes).take(5))(p)` will do
* the same, but only retry a total of five times before raising
* the latest error.
*/
def retry[A](retries: Process[Task, Any])(p: Process[Task, A]): Process[Task, A] = {
val alive = async.signalOf[Unit](())
val step: Process[Task, Throwable \/ A] =
p.append(Process.eval_(alive.close)).attempt()
step.stripW ++ link(alive.continuous)(retries).terminated.flatMap {
// on our last reconnect attempt, rethrow error
case None => step.flatMap(_.fold(Process.fail, Process.emit))
// on other attempts, ignore the exceptions
case Some(_) => step.stripW
}
}
/** Terminate `p` when the given signal `alive` terminates. */
def link[A](alive: Process[Task, Unit])(p: Process[Task, A]): Process[Task, A] =
alive.zip(p).map(_._2)
}
object test extends RetryCombinators {
import scala.concurrent.duration._
// First let's look at Task
// We have a task that always fails
val alwaysFailsTask = Task[Int] { throw new Exception("Failed") }
// Probably more realistically is a Task that sometimes fails depending on some outside force (server being up, etc)
val sometimesFailsTask = Task[Int] { val randomNum = Math.random; if (randomNum < .9) 42 else throw new Exception("Better luck next time: " + randomNum) }
// Now if we run this, it's just chance as to whether we get a successful result
def result: Int = sometimesFailsTask.run
// Better than run is to use `attemptRun`; the above will throw an exception if we get unluck:
def betterResults: Throwable \/ Int = sometimesFailsTask.attemptRun
// But what if we want to retry the task until it succeeds. We can use Task.retry
// In this example, we retry once a second *forever* until we get a success
val finallyCompleted = sometimesFailsTask.retry(Stream.continually(1.second))
// We can limit that a bit, by capping the number of retries to 5
val finallyCompletedOrGaveUp = sometimesFailsTask.retry(Stream.continually(1.second).take(5))
// Ok now let's see how this maps into Process
// Let's say we have a Process.range(1,100); we'll add a debug, so we can see values being emitted
val oneToHundred = Process.range(1, 100).toSource.observe(io.stdOutLines.contramap { x: Any => x.toString })
// And we have a channel that sometimes fails; but when successful passes on its input
val sometimesFailsChannel = channel.lift { x: Int => sometimesFailsTask }
// If we run this, the process will pull values through the channel until it gets to its first failure and blow up
// Go ahead, try it for yourself
def probablyWillThrowChannel = (oneToHundred through sometimesFailsChannel).runLog.run
// What we want is some way to retry the channel until we get a success; let's try using `finallyCompleted`
val finallyCompletesChannel = channel.lift { x: Int => finallyCompleted }
// And we'll run that
def finallyCompletingChannelRun = (oneToHundred through finallyCompletesChannel).runLog.run
// So that worked, kind of. It repeating each step until it was successful. But it has the (at least sometimes) unwanted effect of not letting other
// traffic through. Each item is pull through the channel and that task is repeated until its successful. You might want some way for other items
// to get through while you're waiting for the failing items to retry
//
// Let's try using the `retry` combinator defined above; this works with Process, not Task.
// Here's one way we could define it (note we need an implicit scheduler in scope):
implicit val scheduler = scalaz.stream.DefaultScheduler
val retryProcessAttemptOne = retry(time.awakeEvery(1.second))(oneToHundred through sometimesFailsChannel)
/**
* scala> test.retryProcessAttemptOne.runLog.timed(10.seconds).run
* 1
* 2
* 3
* 4
* 5
* 6
* 1
* 2
* 3
* 4
* 5
* 6
* 7
* 8
* 9
*/
// Hmm that's not what I wanted. That retries the *entire* process, throwing away any successful transformations.
// Let's try just retrying the channel
val retryProcessAttemptTwo = oneToHundred through retry(time.awakeEvery(1.second))(sometimesFailsChannel)
/**
scala> test.retryProcessAttemptTwo.runLog.timed(10.seconds).run
1
2
3
4
5
java.lang.Exception: Better luck next time: 0.9738160947217549
at ...
scala>
*/
// That did not do what I wanted; as soon as the channel returned a throwable, it blew up and stopped execution.
// Let's go back to Task for a minute. Let's explicitely introduce some parallelism. Let's say we have 100 tasks
// we want to process, some of which could fail. We want to retry failing tasks but also make progress on other ones
// while we wait
val hundredFaultyTasks: Task[List[Int]] = Task.gatherUnordered(1.to(100).toList.map { x => println(s"running $x"); finallyCompleted.map { _ => x } })
/**
* scala> test.hundredFaultyTasks.timed(10.seconds).run
* res0: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 14, 20, 23, 24, 25, 26, 29, 30, 32, 33, 34, 35, 37, 38, 41, 42, 43, 49, 50, 51, 53, 55, 56, 57, 58, 59, 60, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 78, 79, 80, 81, 82, 83, 85, 86, 87, 89, 90, 91, 92, 93, 94, 95, 97, 99, 100, 88, 39, 40, 44, 46, 47, 22, 31, 15, 10, 17, 18, 19, 21, 36, 48, 61, 62, 64, 65, 77, 96, 16, 13, 28, 45, 52, 54, 66, 63, 27, 84, 98)
*/
// Ok so that's good. At least when I have a 100 tasks, I was able to get some parallelism and retries. Note the above is unordered,
// but we get it ordered as well
val hundredFaultyTasksOrdered: Task[List[Int]] = Nondeterminism[Task].gather(1.to(100).toList.map { x => println(s"running $x"); finallyCompleted.map { _ => x } })
// Maybe that's the trick with Process? We need to explictely tell it to let other values through while waiting.
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment