Skip to content

Instantly share code, notes, and snippets.

@wogan
Created December 19, 2017 17:28
Show Gist options
  • Save wogan/ed420eb60c43b7aefd1adc156ea0325d to your computer and use it in GitHub Desktop.
Save wogan/ed420eb60c43b7aefd1adc156ea0325d to your computer and use it in GitHub Desktop.
Retry helper methods & DSL for Monix Tasks.
package retry
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.util.Random
import scala.concurrent.duration._
object Backoff {
val none: Backoff = _ => Duration.Zero
def constant(value: FiniteDuration): Backoff =
_ => value
def exponential(initial: FiniteDuration): Backoff =
attempt => initial * (1L << attempt - 1)
def linear(initial: FiniteDuration): Backoff =
initial * _
def randomize(policy: Backoff, offset: FiniteDuration, scale: FiniteDuration): Backoff =
policy.andThen(_ + offset + (scale.toNanos * Random.nextDouble).nanos)
}
package retry
import scala.concurrent.duration.{Duration, DurationConversions, FiniteDuration, TimeUnit}
import scala.language.implicitConversions
/**
* Domain specific language for constructing a RetryPolicy.
* {{{
* retryFor(3.attempts or 2.seconds) using exponentialBackoff(10.millis).randomized(5.millis)
* }}}*/
object dsl {
def retryFor(termination: Termination): RetryPolicy =
RetryPolicy(terminate = termination)
def exponentialBackoff(duration: FiniteDuration): Backoff =
Backoff exponential duration
def linearBackoff(duration: FiniteDuration): Backoff =
Backoff linear duration
def constantBackoff(duration: FiniteDuration): Backoff =
Backoff constant duration
def noBackoff: Backoff =
Backoff.none
implicit final class IntSyntax(private val int: Int) extends AnyVal {
def attempts: Termination =
Termination limitAttempts int
}
implicit final class DurationInt(private val n: Int) extends AnyVal with DurationConversions {
override protected def durationIn(unit: TimeUnit): FiniteDuration = Duration(n.toLong, unit)
}
implicit final class RetryPolicySyntax(private val rp: RetryPolicy) extends AnyVal {
def using(backoff: Backoff): RetryPolicy =
rp.copy(backoff = backoff)
}
implicit final class TerminationSyntax(private val t: Termination) extends AnyVal {
def and(other: Termination): Termination =
Termination.all(t, other)
def or(other: Termination): Termination =
Termination.any(t, other)
}
implicit final class BackoffSyntax(private val b: Backoff) extends AnyVal {
def randomized(jitter: FiniteDuration): Backoff =
Backoff.randomize(b, -jitter / 2, jitter)
def randomized(offset: FiniteDuration, scale: FiniteDuration): Backoff =
Backoff.randomize(b, offset, scale)
}
implicit def durationToTermination(finiteDuration: FiniteDuration): Termination =
Termination limitDuration finiteDuration
}
import monix.eval.Task
import monix.execution.Scheduler
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
package object retry {
/**
* Given the attempt number, determines how long to delay before retrying.
*/
type Backoff = Int => FiniteDuration
/**
* Given the attempt number and the time that will have elapsed since starting
* when the next attempt will execute, return whether or not to terminate retrying.
*/
type Termination = (Int, FiniteDuration) => Boolean
implicit class TaskRetrySyntax[A](task: Task[A]) {
@inline
def reject(pf: PartialFunction[A, Throwable]): Task[A] =
task flatMap { a =>
pf.lift(a).fold(Task.now(a))(Task.raiseError)
}
@inline
def retryWith(policy: RetryPolicy): Task[A] =
Task.deferAction(s => execute(task, policy, s.currentTimeMillis))
@inline
def retrying(implicit policy: RetryPolicy): Task[A] =
retryWith(policy)
@inline
def retryWhen(error: Throwable => Boolean)(implicit policy: RetryPolicy): Task[A] =
Task.deferAction(s => execute(task, policy, s.currentTimeMillis, error))
@inline
def retryOnly(error: PartialFunction[Throwable, Unit])(implicit policy: RetryPolicy): Task[A] =
retryWhen(error.isDefinedAt)
@inline
def retryUnless(error: PartialFunction[Throwable, Unit])(implicit policy: RetryPolicy): Task[A] =
retryWhen((error.isDefinedAt _).andThen(!_))
}
private def execute[A](task: Task[A],
policy: RetryPolicy,
started: Long,
shouldRetry: Throwable => Boolean = _ => true): Task[A] = {
def forAttempt(initialDelay: FiniteDuration, attempt: Int): Task[A] =
task delayExecution initialDelay onErrorHandleWith { error =>
Task deferAction { s =>
val timeSoFar = (s.currentTimeMillis - started).millis
val delay = policy.backoff(attempt)
val terminate = policy.terminate(attempt, delay + timeSoFar)
if (!terminate && shouldRetry(error)) {
forAttempt(delay, attempt + 1)
} else {
Task raiseError error
}
}
}
forAttempt(Duration.Zero, 1)
}
}
package retry
case class RetryPolicy(terminate: Termination = Termination.always,
backoff: Backoff = Backoff.none)
package retry
import scala.concurrent.duration.FiniteDuration
object Termination {
val always: Termination = (_, _) => true
val never: Termination = (_, _) => false
def limitAttempts(maxAttempts: Int): Termination =
(attempts, _) => attempts >= maxAttempts
def limitDuration(maxDuration: FiniteDuration): Termination =
(_, duration) => duration >= maxDuration
def any(first: Termination, rest: Termination*): Termination = {
val all = first +: rest
(attempt, duration) => all.exists(_.apply(attempt, duration))
}
def all(first: Termination, rest: Termination*): Termination = {
val all = first +: rest
(attempt, duration) => all.forall(_.apply(attempt, duration))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment