Skip to content

Instantly share code, notes, and snippets.

@tzachz
Created December 6, 2019 23:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tzachz/da27f44929dd2103de6eb9c669fbab2e to your computer and use it in GitHub Desktop.
Save tzachz/da27f44929dd2103de6eb9c669fbab2e to your computer and use it in GitHub Desktop.
ExponentialBackoffRetry.scala
package com.tzachz.retry
import akka.actor.ActorSystem
import akka.pattern.Patterns.after
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.reflect.ClassTag
/**
* Created by tzachz on 12/6/19
*/
trait ExponentialBackoffRetry {
implicit val ec: ExecutionContext
implicit val actorSystem: ActorSystem
/****
* Attempts to run "request" as long as it fails with an exception of type "RE" for which "isRecoverable" is true
* @param isRecoverable - checks whether the failure should be retried
* @param request - action to run
* @param config - initial delay and factor configuration for exponential retry
* @tparam T - type return from request
* @tparam RE - expected type of recoverable exceptions
* @return Future[T] with either a successful result or a non-recoverable failure
*/
def withRetries[T, RE <: Throwable : ClassTag](isRecoverable: RE => Boolean)
(request: () => Future[T])
(implicit config: ExponentialBackoffConfig): Future[T] = {
def iteration(currentDelay: FiniteDuration, attempt: Int): Future[T] = request.apply().recoverWith {
case e: RE if isRecoverable(e) =>
val next: FiniteDuration = if (attempt == 0) currentDelay else currentDelay * config.factor
logger.warn(s"Recoverable failure detected; Attempt = $attempt; Retrying in $next...", e)
after(next, actorSystem.scheduler, ec, () => iteration(next, attempt + 1))
}
iteration(config.initialDelay, 0)
}
}
case class ExponentialBackoffConfig(initialDelay: FiniteDuration = 1.second, factor: Long = 2)
package com.tzachz.retry
import java.util.concurrent.Executors.newCachedThreadPool
import akka.actor.ActorSystem
import scala.concurrent.ExecutionContext.fromExecutor
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.Random
case class NetworkException(code: Int) extends Throwable
class Service extends ExponentialBackoffRetry {
override implicit val ec: ExecutionContext = fromExecutor(newCachedThreadPool())
override implicit val actorSystem: ActorSystem = ActorSystem(this.getClass.getSimpleName)
implicit val config: ExponentialBackoffConfig = ExponentialBackoffConfig(initialDelay = 10.millis)
val isRecoverable: NetworkException => Boolean = _.code == 500 // let's say only 500 errors are recoverable
def makeRequest(): Future[String] = withRetries(isRecoverable) {
() => networkRequest()
}
def networkRequest(): Future[String] = {
val random = Random.nextFloat * 100
if (random < 10) // low prob (10%): unrecoverable error
Future.failed(NetworkException(code = 400))
else if (random < 80) // high prob (70%): recoverable error
Future.failed(NetworkException(code = 500))
else // low prob (20%): success
Future.successful("<data>")
}
}
// try running this and see what happens :)
println(Await.ready(new Service().makeRequest(), 10.seconds))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment