Created
December 19, 2017 17:28
-
-
Save wogan/ed420eb60c43b7aefd1adc156ea0325d to your computer and use it in GitHub Desktop.
Retry helper methods & DSL for Monix Tasks.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package retry | |
import scala.concurrent.duration.{Duration, FiniteDuration} | |
import scala.util.Random | |
import scala.concurrent.duration._ | |
object Backoff { | |
val none: Backoff = _ => Duration.Zero | |
def constant(value: FiniteDuration): Backoff = | |
_ => value | |
def exponential(initial: FiniteDuration): Backoff = | |
attempt => initial * (1L << attempt - 1) | |
def linear(initial: FiniteDuration): Backoff = | |
initial * _ | |
def randomize(policy: Backoff, offset: FiniteDuration, scale: FiniteDuration): Backoff = | |
policy.andThen(_ + offset + (scale.toNanos * Random.nextDouble).nanos) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package retry | |
import scala.concurrent.duration.{Duration, DurationConversions, FiniteDuration, TimeUnit} | |
import scala.language.implicitConversions | |
/** | |
* Domain specific language for constructing a RetryPolicy. | |
* {{{ | |
* retryFor(3.attempts or 2.seconds) using exponentialBackoff(10.millis).randomized(5.millis) | |
* }}}*/ | |
object dsl { | |
def retryFor(termination: Termination): RetryPolicy = | |
RetryPolicy(terminate = termination) | |
def exponentialBackoff(duration: FiniteDuration): Backoff = | |
Backoff exponential duration | |
def linearBackoff(duration: FiniteDuration): Backoff = | |
Backoff linear duration | |
def constantBackoff(duration: FiniteDuration): Backoff = | |
Backoff constant duration | |
def noBackoff: Backoff = | |
Backoff.none | |
implicit final class IntSyntax(private val int: Int) extends AnyVal { | |
def attempts: Termination = | |
Termination limitAttempts int | |
} | |
implicit final class DurationInt(private val n: Int) extends AnyVal with DurationConversions { | |
override protected def durationIn(unit: TimeUnit): FiniteDuration = Duration(n.toLong, unit) | |
} | |
implicit final class RetryPolicySyntax(private val rp: RetryPolicy) extends AnyVal { | |
def using(backoff: Backoff): RetryPolicy = | |
rp.copy(backoff = backoff) | |
} | |
implicit final class TerminationSyntax(private val t: Termination) extends AnyVal { | |
def and(other: Termination): Termination = | |
Termination.all(t, other) | |
def or(other: Termination): Termination = | |
Termination.any(t, other) | |
} | |
implicit final class BackoffSyntax(private val b: Backoff) extends AnyVal { | |
def randomized(jitter: FiniteDuration): Backoff = | |
Backoff.randomize(b, -jitter / 2, jitter) | |
def randomized(offset: FiniteDuration, scale: FiniteDuration): Backoff = | |
Backoff.randomize(b, offset, scale) | |
} | |
implicit def durationToTermination(finiteDuration: FiniteDuration): Termination = | |
Termination limitDuration finiteDuration | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import monix.eval.Task | |
import monix.execution.Scheduler | |
import scala.concurrent.duration._ | |
import scala.concurrent.{ExecutionContext, Future} | |
package object retry { | |
/** | |
* Given the attempt number, determines how long to delay before retrying. | |
*/ | |
type Backoff = Int => FiniteDuration | |
/** | |
* Given the attempt number and the time that will have elapsed since starting | |
* when the next attempt will execute, return whether or not to terminate retrying. | |
*/ | |
type Termination = (Int, FiniteDuration) => Boolean | |
implicit class TaskRetrySyntax[A](task: Task[A]) { | |
@inline | |
def reject(pf: PartialFunction[A, Throwable]): Task[A] = | |
task flatMap { a => | |
pf.lift(a).fold(Task.now(a))(Task.raiseError) | |
} | |
@inline | |
def retryWith(policy: RetryPolicy): Task[A] = | |
Task.deferAction(s => execute(task, policy, s.currentTimeMillis)) | |
@inline | |
def retrying(implicit policy: RetryPolicy): Task[A] = | |
retryWith(policy) | |
@inline | |
def retryWhen(error: Throwable => Boolean)(implicit policy: RetryPolicy): Task[A] = | |
Task.deferAction(s => execute(task, policy, s.currentTimeMillis, error)) | |
@inline | |
def retryOnly(error: PartialFunction[Throwable, Unit])(implicit policy: RetryPolicy): Task[A] = | |
retryWhen(error.isDefinedAt) | |
@inline | |
def retryUnless(error: PartialFunction[Throwable, Unit])(implicit policy: RetryPolicy): Task[A] = | |
retryWhen((error.isDefinedAt _).andThen(!_)) | |
} | |
private def execute[A](task: Task[A], | |
policy: RetryPolicy, | |
started: Long, | |
shouldRetry: Throwable => Boolean = _ => true): Task[A] = { | |
def forAttempt(initialDelay: FiniteDuration, attempt: Int): Task[A] = | |
task delayExecution initialDelay onErrorHandleWith { error => | |
Task deferAction { s => | |
val timeSoFar = (s.currentTimeMillis - started).millis | |
val delay = policy.backoff(attempt) | |
val terminate = policy.terminate(attempt, delay + timeSoFar) | |
if (!terminate && shouldRetry(error)) { | |
forAttempt(delay, attempt + 1) | |
} else { | |
Task raiseError error | |
} | |
} | |
} | |
forAttempt(Duration.Zero, 1) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package retry | |
case class RetryPolicy(terminate: Termination = Termination.always, | |
backoff: Backoff = Backoff.none) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package retry | |
import scala.concurrent.duration.FiniteDuration | |
object Termination { | |
val always: Termination = (_, _) => true | |
val never: Termination = (_, _) => false | |
def limitAttempts(maxAttempts: Int): Termination = | |
(attempts, _) => attempts >= maxAttempts | |
def limitDuration(maxDuration: FiniteDuration): Termination = | |
(_, duration) => duration >= maxDuration | |
def any(first: Termination, rest: Termination*): Termination = { | |
val all = first +: rest | |
(attempt, duration) => all.exists(_.apply(attempt, duration)) | |
} | |
def all(first: Termination, rest: Termination*): Termination = { | |
val all = first +: rest | |
(attempt, duration) => all.forall(_.apply(attempt, duration)) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment