Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Squashed patch with mapBehavior
diff --git a/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java b/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java
index 5125611..25268dd 100644
--- a/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java
+++ b/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java
@@ -8,6 +8,10 @@ import akka.routing.CurrentRoutees;
import akka.routing.FromConfig;
import akka.routing.NoRouter;
import akka.testkit.AkkaSpec;
+import static akka.pattern.Patterns.ask;
+import akka.dispatch.*;
+import akka.util.Duration;
+import akka.util.Timeout;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -68,4 +72,17 @@ public class JavaAPI {
ref.tell("hallo");
ref.tell("hallo", ref);
}
+
+ @Test
+ public void mustSupportPreAndPostReceive() throws Exception {
+ ActorRef ref = system.actorOf(new Props(JavaAPIPrePostActor.class));
+ assertNotNull(ref);
+ Timeout timeout = new Timeout(Duration.parse("1 second"));
+ String pre = (String) Await.result(ask(ref, "onPreReceive", timeout), timeout.duration());
+ String middle = (String) Await.result(ask(ref, "onReceivePartial", timeout), timeout.duration());
+ String post = (String) Await.result(ask(ref, "onPostReceive", timeout), timeout.duration());
+ assertEquals(pre, "onPreReceive");
+ assertEquals(middle, "onReceivePartial");
+ assertEquals(post, "onPostReceive");
+ }
}
diff --git a/akka-actor-tests/src/test/java/akka/actor/JavaAPIPrePostActor.java b/akka-actor-tests/src/test/java/akka/actor/JavaAPIPrePostActor.java
new file mode 100644
index 0000000..05dfc02
--- /dev/null
+++ b/akka-actor-tests/src/test/java/akka/actor/JavaAPIPrePostActor.java
@@ -0,0 +1,50 @@
+package akka.actor;
+
+import akka.japi.PartialProcedure;
+import akka.japi.Option;
+
+public class JavaAPIPrePostActor extends UntypedActor {
+ @Override
+ public void onReceive(Object message) {
+ // this is not called since we override onReceivePartial instead
+ }
+
+ @Override
+ public PartialProcedure<Object> onReceivePartial() {
+ PartialProcedure<Object> handler = new PartialProcedure<Object>() {
+ public void apply(Object o) {
+ getSender().tell("onReceivePartial");
+ }
+ public boolean isDefinedAt(Object o) {
+ return (o instanceof String && ((String) o).equals("onReceivePartial"));
+ }
+ };
+ return handler;
+ }
+
+ @Override
+ public Option<PartialProcedure<Object>> onPreReceive() {
+ PartialProcedure<Object> handler = new PartialProcedure<Object>() {
+ public void apply(Object o) {
+ getSender().tell("onPreReceive");
+ }
+ public boolean isDefinedAt(Object o) {
+ return (o instanceof String && ((String) o).equals("onPreReceive"));
+ }
+ };
+ return Option.some(handler);
+ }
+
+ @Override
+ public Option<PartialProcedure<Object>> onPostReceive() {
+ PartialProcedure<Object> handler = new PartialProcedure<Object>() {
+ public void apply(Object o) {
+ getSender().tell("onPostReceive");
+ }
+ public boolean isDefinedAt(Object o) {
+ return (o instanceof String && ((String) o).equals("onPostReceive"));
+ }
+ };
+ return Option.some(handler);
+ }
+}
diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala
index e8c667b..913e5cc 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala
@@ -401,4 +401,81 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
}
}
}
+
+ "support mixin message handlers and execute in proper order" in {
+ // "pre" mixin that runs other handlers second
+ trait HandlesA1 extends Actor {
+ override def mapBehavior(behavior: Receive) = {
+ val handler: Receive = {
+ case "A1" ⇒ sender ! "HandlesA1"
+ }
+ super.mapBehavior(handler orElse behavior)
+ }
+ }
+
+ // another "pre" mixin
+ trait HandlesA2 extends Actor {
+ override def mapBehavior(behavior: Receive) = {
+ val handler: Receive = {
+ case "A2" ⇒ sender ! "HandlesA2"
+ case "A1" ⇒ sender ! "HandlesA2" // not reached, HandlesA1 filters
+ }
+ super.mapBehavior(handler orElse behavior)
+ }
+ }
+
+ // "post" mixin that runs other handlers first
+ trait HandlesB1 extends Actor {
+ override def mapBehavior(behavior: Receive) = {
+ val handler: Receive = {
+ case "B1" ⇒ sender ! "HandlesB1"
+ case "C" ⇒ sender ! "HandlesB1" // not reached, HandlesC filters
+ }
+ super.mapBehavior(behavior orElse handler)
+ }
+ }
+
+ // another "post" mixin
+ trait HandlesB2 extends Actor {
+ override def mapBehavior(behavior: Receive) = {
+ val handler: Receive = {
+ case "B2" ⇒ sender ! "HandlesB2"
+ case "B1" ⇒ sender ! "HandlesB2" // not reached, HandlesB1 filters
+ case "C" ⇒ sender ! "HandlesB2" // not reached, HandlesC filters
+ }
+ super.mapBehavior(behavior orElse handler)
+ }
+ }
+
+ // this is a completely unmodified actor other
+ // than having "with HandlesA with HandlesB",
+ // it doesn't have to worry about chaining up
+ // or anything like that.
+ class HandlesC extends Actor with HandlesA1 with HandlesA2 with HandlesB1 with HandlesB2 {
+ def receive = {
+ case "C" ⇒ sender ! "HandlesC"
+ case "A1" ⇒ sender ! "HandlesC" // not reached, HandlesA1 filters
+ case "A2" ⇒ sender ! "HandlesC" // not reached, HandlesA2 filters
+ }
+ }
+
+ val timeout = Timeout(20000)
+ val ref = system.actorOf(Props(new HandlesC))
+
+ val a1 = (ref.ask("A1")(timeout)).mapTo[String]
+ val a2 = (ref.ask("A2")(timeout)).mapTo[String]
+ val c = (ref.ask("C")(timeout)).mapTo[String]
+ val b1 = (ref.ask("B1")(timeout)).mapTo[String]
+ val b2 = (ref.ask("B2")(timeout)).mapTo[String]
+
+ ref ! PoisonPill
+
+ Await.result(a1, timeout.duration) must be("HandlesA1")
+ Await.result(a2, timeout.duration) must be("HandlesA2")
+ Await.result(c, timeout.duration) must be("HandlesC")
+ Await.result(b1, timeout.duration) must be("HandlesB1")
+ Await.result(b2, timeout.duration) must be("HandlesB2")
+
+ awaitCond(ref.isTerminated, 2000 millis)
+ }
}
diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala
index 2499d42..b043aea 100644
--- a/akka-actor/src/main/scala/akka/actor/Actor.scala
+++ b/akka-actor/src/main/scala/akka/actor/Actor.scala
@@ -240,6 +240,30 @@ trait Actor {
final def sender: ActorRef = context.sender
/**
+ * This method allows traits and subclasses to mix in actor behavior.
+ * Whenever an actor pushes a new behavior, it will be mapped
+ * using `mapBehavior`. (The default behavior is
+ * from the `receive` method but it can be replaced using the
+ * `become` method.)
+ * <p/>
+ * To allow multiple mixin traits, implementations of this
+ * method should chain up to `super.mapBehavior` in order
+ * to apply the customizations from supertypes.
+ * <p/>
+ * The simplest usage is to run some handler before the
+ * actor's normal behavior:
+ * {{{
+ * override def mapBehavior(behavior: Receive) = {
+ * val handler: Receive = {
+ * case "MyMessage" ⇒
+ * }
+ * super.mapBehavior(handler orElse behavior)
+ * }
+ * }}}
+ */
+ protected def mapBehavior(behavior: Receive): Receive = behavior
+
+ /**
* This defines the initial actor behavior, it must return a partial function
* with the actor logic.
*/
@@ -321,7 +345,7 @@ trait Actor {
* For Akka internal use only.
*/
private[akka] def pushBehavior(behavior: Receive): Unit = {
- behaviorStack = behaviorStack.push(behavior)
+ behaviorStack = behaviorStack.push(mapBehavior(behavior))
}
/**
@@ -339,6 +363,6 @@ trait Actor {
private[akka] def clearBehaviorStack(): Unit =
behaviorStack = Stack.empty[Receive].push(behaviorStack.last)
- private var behaviorStack: Stack[Receive] = Stack.empty[Receive].push(receive)
+ private var behaviorStack: Stack[Receive] = Stack.empty[Receive].push(mapBehavior(receive))
}
diff --git a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala
index a5ebeb8..b66637d 100644
--- a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala
+++ b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala
@@ -5,6 +5,7 @@
package akka.actor
import akka.japi.{ Creator }
+import akka.japi
/**
* Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model':
@@ -98,6 +99,19 @@ abstract class UntypedActor extends Actor {
@throws(classOf[Exception])
def onReceive(message: Any): Unit
+ /**
+ * By default, `onReceivePartial` forwards to `onReceive`; if you need to
+ * avoid handling some messages (for example to allow a `postReceive` handler
+ * to run) then you could override `onReceivePartial` rather than `onReceive`.
+ * If you override `onReceivePartial` then `onReceive` will not be called
+ * unless you call it yourself.
+ */
+ @throws(classOf[Exception])
+ def onReceivePartial: japi.PartialProcedure[Any] = new japi.PartialProcedure[Any]() {
+ override def apply(x: Any) = onReceive(x)
+ override def isDefinedAt(x: Any) = true
+ }
+
def getContext(): UntypedActorContext = context.asInstanceOf[UntypedActorContext]
/**
@@ -150,9 +164,37 @@ abstract class UntypedActor extends Actor {
*/
override def postRestart(reason: Throwable): Unit = super.postRestart(reason)
- final protected def receive = {
- case msg ⇒ onReceive(msg)
+ final protected def receive = onReceivePartial.asScala
+
+ // this isn't final so mixins can work, but
+ // overriding it in Java is not expected.
+ override protected def mapBehavior(behavior: Receive): Receive = {
+ val chain = Seq(onPreReceive.asScala.map(_.asScala),
+ Some(behavior),
+ onPostReceive.asScala.map(_.asScala)).flatMap(_.toSeq)
+ super.mapBehavior(chain.reduce(_ orElse _))
}
+
+ /**
+ * User overridable callback: by default it returns None.
+ * <p/>
+ * If you provide a handler, it will filter messages before the
+ * regular `onReceive` handler.
+ */
+ protected def onPreReceive: japi.Option[japi.PartialProcedure[Any]] = japi.Option.none
+
+ /**
+ * User overridable callback: by default it returns None.
+ * <p/>
+ * If you provide a handler, it will handle messages not matched by
+ * the regular `onReceivePartial` handler. Note that by default,
+ * `onReceivePartial` matches ALL messages by forwarding them to
+ * `onReceive`. Therefore, by default no `onPostReceive` handler
+ * will ever be used; only actors which override `onReceivePartial`
+ * to leave some messages unhandled can benefit from an
+ * `onPostReceive`.
+ */
+ protected def onPostReceive: japi.Option[japi.PartialProcedure[Any]] = japi.Option.none
}
/**
diff --git a/akka-actor/src/main/scala/akka/japi/JavaAPI.scala b/akka-actor/src/main/scala/akka/japi/JavaAPI.scala
index 47ce667..e7df118 100644
--- a/akka-actor/src/main/scala/akka/japi/JavaAPI.scala
+++ b/akka-actor/src/main/scala/akka/japi/JavaAPI.scala
@@ -35,6 +35,22 @@ trait Procedure2[T1, T2] {
}
/**
+ * A PartialProcedure abstract class. Used to create partial functions
+ * that return void in Java.
+ */
+abstract class PartialProcedure[T] {
+ def apply(param: T): Unit
+ def isDefinedAt(param: T): Boolean
+
+ private class DelegatingPartialFunction[T](val delegate: PartialProcedure[T]) extends scala.PartialFunction[T, Unit] {
+ override def apply(param: T) = delegate.apply(param)
+ override def isDefinedAt(param: T) = delegate.isDefinedAt(param)
+ }
+
+ def asScala: scala.PartialFunction[T, Unit] = new DelegatingPartialFunction(this)
+}
+
+/**
* An executable piece of code that takes no parameters and doesn't return any value.
*/
trait SideEffect {
diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst
index 5374c8a..d4413ef 100644
--- a/akka-docs/scala/actors.rst
+++ b/akka-docs/scala/actors.rst
@@ -668,15 +668,24 @@ state of the failing actor instance is lost if you don't store and restore it in
``preRestart`` and ``postRestart`` callbacks.
-Extending Actors using PartialFunction chaining
-===============================================
+Extending Actors using mapBehavior and PartialFunction chaining
+===============================================================
-A bit advanced but very useful way of defining a base message handler and then
-extend that, either through inheritance or delegation, is to use
-``PartialFunction.orElse`` chaining.
+You can create "mixin" traits or abstract classes using the
+``mapBehavior`` method on ``Actor``. This method modifies the
+standard actor behavior as defined by ``receive`` or ``become``.
+To allow multiple traits to be mixed in to one actor, when you
+override ``mapBehavior`` you should always chain
+up and allow supertypes to run their ``mapBehavior`` as well.
-.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#receive-orElse
+.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#receive-mapBehavior
+
+Multiple traits that implement ``mapBehavior``
+in this way can be mixed in to the same concrete class. The
+concrete class need not do anything special, it implements
+``receive`` as usual.
-Or:
+``PartialFunction.orElse`` chaining can also be used for more
+complex scenarios, like dynamic runtime registration of handlers:
-.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#receive-orElse2
+.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#receive-orElse
diff --git a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala
index 0bc540f..048bae4 100644
--- a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala
+++ b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala
@@ -114,41 +114,58 @@ object SwapperApp extends App {
}
//#swapper
-//#receive-orElse
-
-abstract class GenericActor extends Actor {
- // to be defined in subclassing actor
- def specificMessageHandler: Receive
+//#receive-mapBehavior
+// trait providing a generic fallback message handler
+trait GenericActor extends Actor {
// generic message handler
- def genericMessageHandler: Receive = {
+ private def genericMessageHandler: Receive = {
case event ⇒ printf("generic: %s\n", event)
}
- def receive = specificMessageHandler orElse genericMessageHandler
+ // because we chain up to super.mapBehavior,
+ // multiple traits like this can be mixed in.
+ override def mapBehavior(behavior: Receive): Receive =
+ super.mapBehavior(behavior orElse genericMessageHandler)
}
class SpecificActor extends GenericActor {
- def specificMessageHandler = {
+ def receive = {
case event: MyMsg ⇒ printf("specific: %s\n", event.subject)
}
}
case class MyMsg(subject: String)
-//#receive-orElse
+//#receive-mapBehavior
-//#receive-orElse2
+//#receive-orElse
trait ComposableActor extends Actor {
private var receives: List[Receive] = List()
+ private var composedReceives: Receive = Map.empty // in Scala 2.10, PartialFunction.empty
+
protected def registerReceive(receive: Receive) {
+ // keep a list (allows unregistration)
receives = receive :: receives
+ // cache the composition of all receives
+ composedReceives = receives reduce { _ orElse _ }
}
- def receive = receives reduce { _ orElse _ }
+ // this indirection is because preReceive is only called
+ // once, but we want to allow registration post-construct,
+ // so we need a constant Receive that forwards to our
+ // dynamic Receive
+ private def handleRegisteredReceives: Receive = new PartialFunction[Any, Unit]() {
+ override def apply(x: Any) = composedReceives.apply(x)
+ override def isDefinedAt(x: Any) = composedReceives.isDefinedAt(x)
+ }
+
+ override def mapBehavior(behavior: Receive) =
+ super.mapBehavior(behavior orElse handleRegisteredReceives)
}
class MyComposableActor extends ComposableActor {
override def preStart() {
+ // register some handlers dynamically
registerReceive({
case "foo" ⇒ /* Do something */
})
@@ -157,9 +174,15 @@ class MyComposableActor extends ComposableActor {
case "bar" ⇒ /* Do something */
})
}
+
+ // Runs after the dynamically-registered handlers,
+ // which are added with preReceive
+ def receive = {
+ case "baz" ⇒
+ }
}
-//#receive-orElse2
+//#receive-orElse
class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
"import context" in {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment