Skip to content

Instantly share code, notes, and snippets.

@diggzhang
Created September 13, 2017 03:44
Show Gist options
  • Save diggzhang/ff00b87d336056025024d60029c6e8d6 to your computer and use it in GitHub Desktop.
Save diggzhang/ff00b87d336056025024d60029c6e8d6 to your computer and use it in GitHub Desktop.
actor mail box test code
package com.packt.chapter1
import java.util.concurrent.ConcurrentLinkedQueue
import akka.actor.{Props, Actor, ActorSystem, ActorRef}
import akka.dispatch.{MailboxType, ProducesMessageQueue, Envelope, MessageQueue}
import com.typesafe.config.Config
/**
* Created by user
*/
object CustomMailbox extends App {
val actorSystem = ActorSystem("HelloAkka")
val actor = actorSystem.actorOf(Props[MySpecialActor].withDispatcher("custom-dispatcher"))
val actor1 = actorSystem.actorOf(Props[MyActor],"xyz")
val actor2 = actorSystem.actorOf(Props[MyActor],"MyActor")
actor1 ! ("hello", actor)
actor2 ! ("hello", actor)
}
class MySpecialActor extends Actor {
override def receive: Receive = {
case msg: String => println(s"msg is $msg" )
}
}
class MyActor extends Actor {
override def receive: Receive = {
case (msg: String, actorRef: ActorRef) => actorRef ! msg
case msg => println(msg)
}
}
trait MyUnboundedMessageQueueSemantics
// This is the MessageQueue implementation
class MyMessageQueue extends MessageQueue {
private final val queue = new ConcurrentLinkedQueue[Envelope]()
// these should be implemented; queue used as example
def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
if(handle.sender.path.name == "MyActor") {
handle.sender ! "Hey dude, How are you?, I Know your name,processing your request"
queue.offer(handle)
}
else handle.sender ! "I don't talk to strangers, I can't process your request"
}
def dequeue(): Envelope = queue.poll
def numberOfMessages: Int = queue.size
def hasMessages: Boolean = !queue.isEmpty
def cleanUp(owner: ActorRef, deadLetters: MessageQueue) {
while (hasMessages) {
deadLetters.enqueue(owner, dequeue())
}
}
}
class MyUnboundedMailbox extends MailboxType
with ProducesMessageQueue[MyMessageQueue] {
// This constructor signature must exist, it will be called by Akka
def this(settings: ActorSystem.Settings, config: Config) = {
// put your initialization code here
this()
}
// The create method is called to create the MessageQueue
final override def create(owner: Option[ActorRef],
system: Option[ActorSystem]): MessageQueue =
new MyMessageQueue()
}
@diggzhang
Copy link
Author

As you know, a mailbox uses a message queue, and we need to provide a custom implementation for the queue.

In step two, we define a class, MyMessageQueue, which extends the trait MessageQueue and the implementing methods.

We want our actor to receive messages from only those actors whose name is MyActor, and not from any other actor.

To achieve the aforementioned functionality, we implement the enqueue method, and specify that the message should be enqueued if sender name is MyActor, otherwise ignore the message.

In this case, we used ConcurrentLinkedQueue as the underlying data structure for the queue.

However, it is up to us which data structure we pick for enqueing and removing messages. Changing the data structure may also change the processing order of messages.

In step three, we define the custom mailbox using MyMessageQueue.

In step four, we configure the preceding mailbox with a custom-dispatcher in application.conf.

In step five and six, we define MySpecialActor, which will use the custom mailbox when we create it with the custom-dispatcher. MyActor is the actor which tries to communicate with MySpecialActor.

In step seven, we have two instances of MyActor, actor1 and actor2, which send messages to MySpecialActor.

Since MySpecialActor talks to only those Actors whose name is MyActor, it does not process messages from MyActor whose name is xyz, as you can see in the output.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment