Skip to content

Instantly share code, notes, and snippets.

Last active December 21, 2015 04:39
Show Gist options
  • Save helena/6250995 to your computer and use it in GitHub Desktop.
Save helena/6250995 to your computer and use it in GitHub Desktop.
Rough, initial cut of a trait to mixin when an Actor requires initalization, where the initialization is long and arduous (for example, data initialization related). This strategy allows the implementing actor to delegate the work to another Actor, on a separate, dedicated Dispatcher, and not block any other related Actors in load-time.
# Cloud Extension - Roles Config #
cloud {
roles {
some-role {
# config for the role deployed on n-vms in the cluster
# dedicated dispatcher for provisioning required by someactor
someactor-provisioning-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
# etc..
import scala.collection.immutable
import scala.collection.immutable.Queue
* Rough, initial cut of a trait to mixin when an Actor requires
* initalization, where the initialization is long and arduous (for
* example, data initialization related). This strategy allows the
* implementing actor to delegate the work to another Actor, on a
* separate, dedicated Dispatcher, and not block any other related
* Actors in load-time.
* @author Helena Edelson
trait ProvisioningActor extends Actor with ActorLogging {
* The configured name of the dedicated dispatcher to use.
def provisioningDispatcher: String
* The function to execute on `provisioningDispatcher`
* in the AsyncProvisioner actor.
protected def action(): Unit // TODO the right way
lazy val provisioner = context.actorOf(Props(new AsyncProvisioner(action())).withDispatcher(provisioningDispatcher))
protected var queue: immutable.Queue[QueuedMessage] = Queue.empty
override def preStart(): Unit = becomeProvisioned()
def uninitialized: Actor.Receive = {
case e => log.debug("Received {} while uninitialized", e)
def receive = uninitialized
def becomeProvisioned(): Unit = {
context become provisioning
provisioner ! UserProvisioningAction.Initialized
* Override case e: Any to restrict messages queued by type.
* Default enqueues all messages while provisioning.
def provisioning: Receive = {
case UserProvisioningAction.InitializedAck(a) => becomeInitialized()
case e: Any => enqueue(QueuedMessage(sender, e))
def becomeInitialized(): Unit = {
context become initialized
* Implement desired behavior.
def initialized: Actor.Receive
* Queues applicable messages until store is provisioned.
def enqueue(m: QueuedMessage): Unit = queue = queue.enqueue(m)
def dequeue(): Unit = {
queue foreach (m => self.tell(m.message, m.sender))
queue = Queue.empty // no need to dequeue, send all then set to empty
// TODO correct declaration of 'action
class AsyncProvisioner(action: Unit) extends Actor with ActorLogging {
def receive: Receive = {
case UserProvisioningAction.Initialized => provision()
* Wraps a blocking function in a non-blocking context, on another dispatcher.
def provision(): Unit = {
context.parent ! UserProvisioningAction.InitializedAck(self)
class ProvisioningActorSpec extends AbstractSpec {
"ProvisioningActor" must {
"provision the implementing actor" in {
val actor = system.actorOf(Props[MyProvisioningActor], "test-actor")
actor ! Ping // provisioning, enqueue 1
expectMsgPF(1 second, "count") { case count: Int => count must be (1) }
// initialized
(0 to 10) foreach { n =>
actor ! Pong
expectMsgPF(1 second, "Pong") { case Pong => println("received pong")}
expectMsgPF(1 second, "count") { case count: Int => count must be (0) } // queue empty
class MyProvisioningActor extends ProvisioningActor with ActorLogging {
* Your configured dispatcher.
val provisioningDispatcher = "cloud.roles.some-role.someactor-provisioning-dispatcher"
def action(): Unit = {
// simulate some work that would really take several (painful) minutes
Thread.sleep(5.seconds.toMillis)"function completed")
* While provisioning, override to enqueue any message type(s) of interest,
* default enqueues all types.
override def provisioning: Receive = {
case UserProvisioningAction.InitializedAck(a) => becomeInitialized()
case Ping => enqueue(QueuedMessage(sender, Pong))
* Override to add behavior
override def enqueue(m: QueuedMessage): Unit = {
m.sender ! queue.size
* Implement the standard behavior of your Actor.
def initialized: Actor.Receive = {
case Pong => sender ! Pong; sender ! queue.size
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment