Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Use actor state from Future callbacks
package com.ometer.akka
import akka.actor.Actor
import akka.dispatch.ExecutionContext
import akka.actor.ActorRef
import akka.dispatch.Promise
import akka.dispatch.Future
import akka.dispatch.DefaultPromise
class Example extends SafeActor {
case object Whatever
var myMutableState = 10
override def safeReceive = {
case Whatever =>
// the basic idea is that it's OK to access myMutableState in the Future
// callbacks, normally it would not be. Future callbacks are kept in the
// context of this actor rather than executing on independent threads.
// Needless to say, this is a tradeoff: if your callbacks do some kind of
// blocking or long computation maybe you want an independent thread.
val f = Future({ myMutableState += 1; println("This callback should be serialized on the actor") })
f.map(_ => println("This callback should also be serialized on the actor"))
// futures created outside of this actor would be "imported" using safe()
// to avoid using whatever random ExecutionContext is set on them
// val f = safe(someService.doStuff())
// f.map(...)
}
}
/**
* An implicit ExecutionContext in this actor dispatches
* any Future created in this actor serialized on the
* actor, so callbacks can touch the actor's state.
* "Foreign" futures should be adapted with safe()
* to run their callbacks in this actor.
* It is probably risky to "export" futures that
* execute in this actor to another actor.
*
* To subclass, override safeReceive instead of receive.
* Clunky but not sure what else to do.
*
* Naming this class "safe" is likely unsafe. ;-)
*/
abstract class SafeActor extends Actor {
private[akka] lazy val _safeContext = new SafeActorExecutionContext(this)
/**
* Futures created directly inside the SafeActor would use this, but
* futures from elsewhere need converting with safe()
*/
protected implicit def safeContext: ExecutionContext = _safeContext
private[akka] def reportFailure(t: Throwable) = context.dispatcher.reportFailure(t)
/**
* Convert a future to run its callbacks in this actor.
*/
protected final def safe[T](f: Future[T]): Future[T] = {
f match {
case p: DefaultPromise[_] if p.executor eq _safeContext =>
// optimize the case where a future is already executing
// in this actor
p
case _ =>
Promise[T]()(safeContext).completeWith(f)
}
}
private def internalReceive: Receive = {
case Code(runnable) => runnable.run()
}
/** receive first intercepts SafeActor messages then calls safeReceive */
override final def receive: Receive = internalReceive orElse safeReceive
/** replacement for receive for subclasses to override */
protected def safeReceive: Receive
}
private case class Code(runnable: Runnable)
private class SafeActorExecutionContext(val safeActor: SafeActor) extends ExecutionContext {
override def execute(runnable: Runnable): Unit = {
safeActor.self ! Code(runnable)
}
override def reportFailure(t: Throwable): Unit = {
safeActor.reportFailure(t)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment