Skip to content

Instantly share code, notes, and snippets.

@viktorklang
Created January 26, 2011 17:16
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save viktorklang/797035 to your computer and use it in GitHub Desktop.
Save viktorklang/797035 to your computer and use it in GitHub Desktop.
Unnesting a Scala Actors nested receive example
/**
* I had a question directed at me, on how to encode the following scenario in Akka Actors,
* as for Scala Actors one would simply nest the receives.
*
* Recuirements are as follows:
* The first thing the actor needs to do, is to subscribe to a channel of events,
* Then it must replay (process) all "old" events
* Then it has to wait for a GoAhead signal to begin processing the new events
* It mustn't "miss" events that happen between catching up with the old events and getting the GoAhead signal
*/
import akka.actor._
import akka.actor.Actor._
import scala.collection.mutable.ListBuffer
class MyActor extends Actor {
//If you need to store sender/senderFuture you can change it to ListBuffer[(Any, Channel)]
val queue = new ListBuffer[Any]()
//This message processes a message/event
def process(msg: Any): Unit = println("processing: " + msg)
//This method subscribes the actor to the event bus
def subscribe() {} //Your external stuff
//This method retrieves all prior messages/events
def allOldMessages() = List()
override def preStart {
//We override preStart to be sure that the first message the actor gets is
//'Replay, that message will start to be processed _after_ the actor is started
self ! 'Replay
//Then we subscribe to the stream of messages/events
subscribe()
}
def receive = {
case 'Replay => //Our first message should be a 'Replay message, all others are invalid
allOldMessages() foreach process //Process all old messages/events
become { //Switch behavior to look for the GoAhead signal
case 'GoAhead => //When we get the GoAhead signal we process all our buffered messages/events
queue foreach process
queue.clear
become { //Then we change behaviour to process incoming messages/events as they arrive
case msg => process(msg)
}
case msg => //While we haven't gotten the GoAhead signal, buffer all incoming messages
queue += msg //Here you have full control, you can handle overflow etc
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment