Last active
May 13, 2016 10:11
-
-
Save pomadchin/33b53086cbf81a6256ddb452090e4e3b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.concurrent.duration._ | |
import scala.concurrent.{ExecutionContext, Future, Promise, blocking} | |
import scala.concurrent.duration.Duration | |
import scala.util.{Failure, Random, Success} | |
object FutureUtils { | |
implicit class FutureBackoff[A](future: Future[A]) { | |
def retry(p: (Throwable => Boolean))(implicit executor: ExecutionContext): Future[A] = { | |
def help(count: Int): Future[A] = { | |
val base: Duration = 52.milliseconds | |
val timeout = base * Random.nextInt(math.pow(2,count).toInt) | |
future.recoverWith { | |
case e if p(e) => { | |
Thread sleep timeout.toMillis | |
help(count + 1) | |
} | |
} | |
} | |
Future { help(0) }.flatMap(identity) | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.concurrent.duration._ | |
import scala.concurrent.{ExecutionContext, Future, Promise, blocking} | |
import scala.concurrent.duration.Duration | |
import scala.util.{Failure, Random, Success} | |
import java.util.{Timer, TimerTask} | |
object FutureUtilsTimer { | |
def delay[T](delay: Long)(block: => Future[T])(implicit timer: Timer): Future[T] = { | |
val promise = Promise[T]() | |
timer.schedule(new TimerTask { | |
override def run(): Unit = { | |
promise.completeWith(block) | |
} | |
}, delay) | |
promise.future | |
} | |
implicit class FutureBackoff[A](future: Future[A]) { | |
implicit lazy val timer = new Timer() | |
/** | |
* Implement non-blocking Exponential Backoff on a Future. | |
* | |
* @param p returns true for exceptions that trigger a backoff and retry | |
*/ | |
def retryEBO(p: (Throwable => Boolean))(implicit ec: ExecutionContext): Future[A] = { | |
def help(count: Int): Future[A] = { | |
val base: Duration = 52.milliseconds | |
val timeout = base * Random.nextInt(math.pow(2,count).toInt) // .extInt is [), implying -1 | |
future.recoverWith { | |
case e if p(e) => delay(timeout.toMillis)(help(count + 1)) | |
} | |
} | |
Future { help(0) }.flatMap(identity) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment