Skip to content

Instantly share code, notes, and snippets.

@helena
Last active December 21, 2015 04:39
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save helena/6250995 to your computer and use it in GitHub Desktop.
Save helena/6250995 to your computer and use it in GitHub Desktop.
Rough, initial cut of a trait to mixin when an Actor requires initalization, where the initialization is long and arduous (for example, data initialization related). This strategy allows the implementing actor to delegate the work to another Actor, on a separate, dedicated Dispatcher, and not block any other related Actors in load-time.
######################################
# Cloud Extension - Roles Config #
######################################
cloud {
roles {
some-role {
# config for the role deployed on n-vms in the cluster
...
# dedicated dispatcher for provisioning required by someactor
someactor-provisioning-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
# etc..
}
}
}
}
import scala.collection.immutable
import scala.collection.immutable.Queue
import akka.actor._
/**
* Rough, initial cut of a trait to mixin when an Actor requires
* initalization, where the initialization is long and arduous (for
* example, data initialization related). This strategy allows the
* implementing actor to delegate the work to another Actor, on a
* separate, dedicated Dispatcher, and not block any other related
* Actors in load-time.
*
* @author Helena Edelson
*/
trait ProvisioningActor extends Actor with ActorLogging {
/**
* The configured name of the dedicated dispatcher to use.
*/
def provisioningDispatcher: String
/**
* The function to execute on `provisioningDispatcher`
* in the AsyncProvisioner actor.
*/
protected def action(): Unit // TODO the right way
lazy val provisioner = context.actorOf(Props(new AsyncProvisioner(action())).withDispatcher(provisioningDispatcher))
protected var queue: immutable.Queue[QueuedMessage] = Queue.empty
override def preStart(): Unit = becomeProvisioned()
def uninitialized: Actor.Receive = {
case e => log.debug("Received {} while uninitialized", e)
}
def receive = uninitialized
def becomeProvisioned(): Unit = {
context become provisioning
provisioner ! UserProvisioningAction.Initialized
}
/**
* Override case e: Any to restrict messages queued by type.
* Default enqueues all messages while provisioning.
*/
def provisioning: Receive = {
case UserProvisioningAction.InitializedAck(a) => becomeInitialized()
case e: Any => enqueue(QueuedMessage(sender, e))
}
def becomeInitialized(): Unit = {
context become initialized
dequeue()
}
/**
* Implement desired behavior.
*/
def initialized: Actor.Receive
/**
* Queues applicable messages until store is provisioned.
*/
def enqueue(m: QueuedMessage): Unit = queue = queue.enqueue(m)
def dequeue(): Unit = {
queue foreach (m => self.tell(m.message, m.sender))
queue = Queue.empty // no need to dequeue, send all then set to empty
}
}
// TODO correct declaration of 'action
class AsyncProvisioner(action: Unit) extends Actor with ActorLogging {
def receive: Receive = {
case UserProvisioningAction.Initialized => provision()
}
/**
* Wraps a blocking function in a non-blocking context, on another dispatcher.
*/
def provision(): Unit = {
action
context.parent ! UserProvisioningAction.InitializedAck(self)
}
}
class ProvisioningActorSpec extends AbstractSpec {
"ProvisioningActor" must {
"provision the implementing actor" in {
val actor = system.actorOf(Props[MyProvisioningActor], "test-actor")
actor ! Ping // provisioning, enqueue 1
expectMsgPF(1 second, "count") { case count: Int => count must be (1) }
Thread.sleep(5.seconds.toMillis)
// initialized
(0 to 10) foreach { n =>
actor ! Pong
expectMsgPF(1 second, "Pong") { case Pong => println("received pong")}
expectMsgPF(1 second, "count") { case count: Int => count must be (0) } // queue empty
}
}
}
}
class MyProvisioningActor extends ProvisioningActor with ActorLogging {
/**
* Your configured dispatcher.
*/
val provisioningDispatcher = "cloud.roles.some-role.someactor-provisioning-dispatcher"
def action(): Unit = {
// simulate some work that would really take several (painful) minutes
Thread.sleep(5.seconds.toMillis)
log.info("function completed")
}
/**
* While provisioning, override to enqueue any message type(s) of interest,
* default enqueues all types.
*/
override def provisioning: Receive = {
case UserProvisioningAction.InitializedAck(a) => becomeInitialized()
case Ping => enqueue(QueuedMessage(sender, Pong))
}
/**
* Override to add behavior
*/
override def enqueue(m: QueuedMessage): Unit = {
super.enqueue(m)
m.sender ! queue.size
}
/**
* Implement the standard behavior of your Actor.
*/
def initialized: Actor.Receive = {
case Pong => sender ! Pong; sender ! queue.size
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment