Skip to content

Instantly share code, notes, and snippets.

@josdirksen
Created October 31, 2015 08:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save josdirksen/77e59d236c637d46ab32 to your computer and use it in GitHub Desktop.
Save josdirksen/77e59d236c637d46ab32 to your computer and use it in GitHub Desktop.
import akka.typed.patterns.Receiver._
import akka.typed._
import akka.typed.patterns.Receiver
import akka.typed.ScalaDSL._
import scala.concurrent.duration._
object AkkaTypedReceiver extends App {
sealed trait HelloMsg
final case class HelloCountry(country: String) extends HelloMsg
final case class HelloCity(city: String) extends HelloMsg
final case class HelloWorld() extends HelloMsg
final case class Hello(msg: String) extends HelloMsg
final case class registerReceiverCmdIn(cmdIn: ActorRef[Command[HelloMsg]]) extends HelloMsg
final case class GetAllMessages() extends HelloMsg
final case class PrintMessages(msgs: Seq[HelloMsg]) extends HelloMsg
object Consumer {
val consumer = Total[HelloMsg] {
// in the case of a registerReceiver message, we change the implementation
// since we're ready to receive other message.
case registerReceiverCmdIn(commandAddress) => {
println("Consumer: Switching behavior")
// return a static implementation which closes over actorRefs
// all messages we receive we pass to the receiver, which will queue
// them. We have a specific message that prints out the received messages
ContextAware { ctx =>
Static[HelloMsg] {
// printmessages just prints out the list of messages we've received
case PrintMessages(msgs) => println(s"Consumer: Printing messages: $msgs") ;msgs.foreach { hw => println(s" $hw")}
// if we get the getAllMessages request, we get all the messages from
// the receiver.
case GetAllMessages() => {
println("Consumer: requesting all messages")
val wrap = ctx.spawnAdapter[GetAllResult[HelloMsg]] {
case msgs:GetAllResult[HelloMsg] => println(s"Consumer: Received ${msgs.msgs.length} messages"); PrintMessages(msgs.msgs)
}
commandAddress ! GetAll(2 seconds)(wrap)
}
}
}
}
// for all the other cases return the existing implementation, in essence
// we're just ignoring other messages till we change state
case _ => Same
}
}
/**
* Producer object containing the protocol and the behavior. This is a very simple
* actor that produces messages using a schedule. To start producing messages
* we need to send an initial message
*/
object Producer {
// a simple protocol defining the messages that can be sent
sealed trait ProducerMsg
final case class registerReceiverMsgIn(msgIn: ActorRef[HelloMsg]) extends ProducerMsg
final case class addHelloWorldMsg(msg: HelloMsg) extends ProducerMsg
// the producer, which first waits for a registerReceiver message, after which
// it changes behavior, to send messages.
val producer = Full[ProducerMsg] {
// if we receive a register message, we know where to send messages to
case Msg(ctx, registerReceiverMsgIn(msgConsumer)) =>
println("Producer: Switching behavior")
// simple helper function which sends a message to self.
def scheduleMessage() = ctx.schedule(500 millisecond, ctx.self, addHelloWorldMsg(Hello(s"hello @ ${System.currentTimeMillis()}")))
// schedule the first one, the rest will be triggered through the behavior.
scheduleMessage()
Static {
// add a message to the receiver and schedule a new one
case addHelloWorldMsg(msg) => {println(s"Producer: Adding new '$msg' to receiver: $msgConsumer") ;msgConsumer ! msg; scheduleMessage()}
}
// don't switch behavior on any of the other messages
case _ => Same
}
}
// Simple root actor, which we'll use to start the other actors
val scenario1 = {
Full[Unit] {
case Sig(ctx, PreStart) => {
import Producer._
import Consumer._
println("Scenario1: Started, now lets start up a number of child actors to do our stuff")
// first start the two actors, one implements the receiver pattern, and
// the other is the one we control directly.
val receiverActor = ctx.spawn(Props(Receiver.behavior[HelloMsg]), "receiver")
val consumerActor = ctx.spawn(Props(consumer), "adder")
val producerActor = ctx.spawn(Props(producer), "producer")
// our producerActor first needs the actorRef it can use to add messages to the receiver
// for this we use a wrapper, this wrapper creates a child, which we use to get the
// address, to which we can send messages.
val wrapper = ctx.spawnAdapter[ActorRef[HelloMsg]] {
case p: ActorRef[HelloMsg] => producerActor ! registerReceiverMsgIn(p)
}
// now send the message to get the external address, the response will be sent
// to our own actor as a registerReceiver message, through the adapter
receiverActor ! ExternalAddress(wrapper)
// our printing actor needs to now the address of the receiver so send it to him
consumerActor ! registerReceiverCmdIn(receiverActor)
// by calling getAllMessages we get the messages within a time period.
println("Scenario1: Get all the messages")
consumerActor ! GetAllMessages()
Thread.sleep(3000)
consumerActor ! GetAllMessages()
Thread.sleep(5000)
consumerActor ! GetAllMessages()
Same
}
}
}
val scenario1Actor = ActorSystem("Root", Props(scenario1))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment