Skip to content

Instantly share code, notes, and snippets.

@abhandaru
Last active August 31, 2020 19:27
Show Gist options
  • Save abhandaru/00712e71408332091fc52ffac046ed65 to your computer and use it in GitHub Desktop.
Save abhandaru/00712e71408332091fc52ffac046ed65 to your computer and use it in GitHub Desktop.
First draft of a functional-style RetryPolicy implementation.
import scala.concurrent._
import scala.concurrent.duration._
import java.net.SocketTimeoutException
object Postgres {
// Example for postgres connection timeout retry policy
case class PgConnectionPolicy(n: Int) extends RetryPolicy {
override def attempt(rpc: => Future[T]) = (PgConnectionPolicy(n - 1), rpc)
override def retryable[E <: Throwable] = {
case _: SocketTimeoutException => n > 0
case _ => false
}
}
}
object Main extends App {
import ExecutionContext.Implicits.global
import RetryPolicy._
val req = AsyncRetry(UntilNFailures(2)) {
// some RPC call
}
Await.result(req, 15.seconds)
}
trait RetryPolicy {
def attempt[T](rpc: => Future[T]): (RetryPolicy, Future[T])
def retryable[E <: Throwable]: E => Boolean
}
object RetryPolicy {
case class UntilNFailures(n: Int) extends RetryPolicy {
override def attempt[T](rpc: => Future[T]) = (UntilNFailures(n - 1), rpc)
override def retryable[E <: Throwable] = _ => n > 0
}
}
// Helps split up our async utils
object AsyncRetry {
def apply[T](policy: RetryPolicy)(rpc: => Future[T])(implicit ec: ExecutionContext): Future[T] = {
val (nextPolicy, response) = policy.attempt(rpc)
response.recoverWith {
case e if nextPolicy.retryable(e) => apply(nextPolicy)(rpc)
case e => Future.failed(e)
}
}
}
@jphoang
Copy link

jphoang commented Aug 31, 2020

Fantastic code. The type of work only a seasoned professional engineer could do. Nit: prefer retryable

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment