Last active
July 24, 2017 03:44
-
-
Save justinhj/80d477a329f3d352a7a2af32a6fc532d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.TimeoutException | |
import java.util.{Timer, TimerTask} | |
import scala.concurrent.duration.FiniteDuration | |
import scala.concurrent.{ExecutionContext, Future, Promise} | |
import scala.language.postfixOps | |
object FutureUtil { | |
// All Future's that use futureWithTimeout will use the same Timer object | |
// it is thread safe and scales to thousands of active timers | |
// The true parameter makes the Timer a daemon thread so that it won't stop the program | |
// from completing | |
val timer: Timer = new Timer(true) | |
/** | |
* Returns the result of the provided future within the given time or a timeout exception, whichever is first | |
* This uses Java Timer which runs a single thread to handle all futureWithTimeouts and does not block like a | |
* Thread.sleep would | |
* @param future Caller passes a future to execute | |
* @param timeout Time before we return a Timeout exception instead of future's outcome | |
* @return Future[T] | |
*/ | |
implicit def fetchFutureFetchMonadError(implicit ec: ExecutionContext): FetchMonadError[Future] = | |
new FetchMonadError.FromMonadError[Future] { | |
override def runQuery[A](j: Query[A]): Future[A] = j match { | |
case Sync(e) => { | |
Future.successful({e.value}) | |
} | |
case Async(ac, timeout) => { | |
val p = Promise[A]() | |
val timerTask = new TimerTask() { | |
def run() : Unit = { | |
p.tryFailure(new TimeoutException()) | |
} | |
} | |
// Start the timeout Timer which will run our Timeout callback above | |
HNErrorMonads.timer.schedule(timerTask, timeout.toMillis) | |
// In the case that the future succeeds or fails we call these handlers | |
// which also will cancel the active timer if the timeout did not already | |
// occur | |
val handleSuccess : Query.Callback[A] = { | |
a => | |
val alreadyCompleted = !p.trySuccess(a) | |
if(!alreadyCompleted) timerTask.cancel() | |
} | |
val handleFailure : Query.Errback = { | |
e => | |
val alreadyCompleted = !p.tryFailure(e) | |
if(!alreadyCompleted) timerTask.cancel() | |
} | |
// | |
ec.execute(() => { | |
ac(handleSuccess, handleFailure) | |
}) | |
p.future | |
} | |
case Ap(qf, qx) => | |
runQuery(qf).zip(runQuery(qx)).map { case (f, x) => f(x) } | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment