public
Last active

Future timeout support

  • Download Gist
FutureTimeoutSupport.scala
Scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
import akka.util.{Duration, NonFatal}
import akka.actor.Scheduler
import akka.dispatch.{Promise, ExecutionContext, Future}
// Copied from Akka 2.1-M1
trait FutureTimeoutSupport {
/**
* Returns a [[akka.dispatch.Future]] that will be completed with the success or failure of the provided value
* after the specified duration.
*/
def after[T](duration: Duration, using: Scheduler)(value: Future[T])(implicit ec: ExecutionContext): Future[T] =
if (duration.isFinite() && duration.length < 1) {
try value catch { case NonFatal(t) Promise.failed(t).future }
} else {
val p = Promise[T]()
using.scheduleOnce(duration) { p completeWith { try value catch { case NonFatal(t) Promise.failed(t).future } } }
p.future
}
def timeoutAfter[T](duration: Duration, using: Scheduler)(timedOutFuture: => Future[T], value: Future[T])
(implicit ec: ExecutionContext): Future[T] = {
Future.firstCompletedOf(Seq(after(duration, using)(timedOutFuture), value))
}
}
 
// Example usage
 
val processFuture = Future(ProcessedTask(workSender, request, process(workSender, request)))
val timeoutFuture = Promise.successful(TaskTimedOut(workSender, request))
// Send either a TaskTimedOut or a ProcessedTask message to self, whichever completes first
timeoutAfter(Duration(taskTimeoutMs, TimeUnit.MILLISECONDS), context.system.scheduler)(timeoutFuture, processFuture) pipeTo self

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.