Skip to content

Instantly share code, notes, and snippets.

@justinhj
Last active July 24, 2017 03:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save justinhj/80d477a329f3d352a7a2af32a6fc532d to your computer and use it in GitHub Desktop.
Save justinhj/80d477a329f3d352a7a2af32a6fc532d to your computer and use it in GitHub Desktop.
import java.util.concurrent.TimeoutException
import java.util.{Timer, TimerTask}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.language.postfixOps
object FutureUtil {
// All Future's that use futureWithTimeout will use the same Timer object
// it is thread safe and scales to thousands of active timers
// The true parameter makes the Timer a daemon thread so that it won't stop the program
// from completing
val timer: Timer = new Timer(true)
/**
* Returns the result of the provided future within the given time or a timeout exception, whichever is first
* This uses Java Timer which runs a single thread to handle all futureWithTimeouts and does not block like a
* Thread.sleep would
* @param future Caller passes a future to execute
* @param timeout Time before we return a Timeout exception instead of future's outcome
* @return Future[T]
*/
implicit def fetchFutureFetchMonadError(implicit ec: ExecutionContext): FetchMonadError[Future] =
new FetchMonadError.FromMonadError[Future] {
override def runQuery[A](j: Query[A]): Future[A] = j match {
case Sync(e) => {
Future.successful({e.value})
}
case Async(ac, timeout) => {
val p = Promise[A]()
val timerTask = new TimerTask() {
def run() : Unit = {
p.tryFailure(new TimeoutException())
}
}
// Start the timeout Timer which will run our Timeout callback above
HNErrorMonads.timer.schedule(timerTask, timeout.toMillis)
// In the case that the future succeeds or fails we call these handlers
// which also will cancel the active timer if the timeout did not already
// occur
val handleSuccess : Query.Callback[A] = {
a =>
val alreadyCompleted = !p.trySuccess(a)
if(!alreadyCompleted) timerTask.cancel()
}
val handleFailure : Query.Errback = {
e =>
val alreadyCompleted = !p.tryFailure(e)
if(!alreadyCompleted) timerTask.cancel()
}
//
ec.execute(() => {
ac(handleSuccess, handleFailure)
})
p.future
}
case Ap(qf, qx) =>
runQuery(qf).zip(runQuery(qx)).map { case (f, x) => f(x) }
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment