Skip to content

Instantly share code, notes, and snippets.

@helena
Last active December 21, 2015 00:29
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save helena/6220788 to your computer and use it in GitHub Desktop.
Save helena/6220788 to your computer and use it in GitHub Desktop.
Simple (and truncated) example of the CloudExtension's load-time ordered provisioning and ordered graceful shutdown. Unfortunately this had to be written in an older version of scala and akka - for now. MyNodeGuardian.scala is started in CloudExtension.register() and is a sample of using ProvisioningGuardian which extends OrderedGracefulShutdown.
/**
* CloudExtension and factory for creating CloudExtension instances.
* Example:
* {{{
* val application = CloudExtension(system, config)
* }}}
*
* @author Helena Edelson
*/
object CloudExtension extends ExtensionId[CloudExtension] with ExtensionIdProvider {
/**
* Obtains the instance associated with `system`, registers `config`
* settings at that instance and returns it.
*
* @param config configurations and settings to register.
*/
def apply(system: ActorSystem, config: CloudConfig): CloudExtension = {
val extension = super.apply(system)
extension.register(config)
extension
}
override def lookup: ExtensionId[_ <: Extension] = CloudExtension
override def createExtension(system: ExtendedActorSystem): CloudExtension = new CloudExtension(system)
}
/**
* INTERNAL API.
*
* Supervisor managing the different Cluster nodes.
*
* @author Helena Edelson
*/
private[foo] final class MyNodeGuardian(config: CloudConfig, val selfAddress: Address, uuid: UUID) extends ProvisioningGuardian {
import config._
import settings._
var ordered: IndexedSeq[ActorRef] = IndexedSeq.empty
// any number of framework actors which support node instances
var actor1: Option[ActorRef] = None
var actor2: Option[ActorRef] = None
/**
* Blocks the thread in load-time for sequential provisioning.
* Initialized by cloud extension's register function - see CloudExtension line 20.
*/
def provision(sender: ActorRef): Unit = {
implicit val timeout = BootProvisionTimeout
// each framework actor does a specific job that can be enabled/disabled via configuration
// so for each...
if (SpecificFunctionalityEnabled) {
actor1 = Some(context.actorOf(Props(new SomeFrameworkActor(config, selfAddress, uuid)), "some-actor"))
Await.result((actor.get ? Initialized).mapTo[InitializedAck], timeout.duration)
}
ordered ++= IndexedSeq(actor1, actor2, etc...).flatten
self ! InternalCoreAction.Provisioned
}
}
import akka.util.duration._
import akka.actor._
import akka.dispatch.Await
import akka.pattern.gracefulStop
import akka.actor. { SupervisorStrategy, OneForOneStrategy }
/**
* Handles graceful shutdown of all `ordered` actors.
* GracefulShutdownException, Supervision, settings are all custom.
*/
trait OrderedGracefulShutdown extends Actor with Supervision with ActorLogging {
import settings._
import context._
def ordered: IndexedSeq[ActorRef]
override val supervisorStrategy = guardianSupervisorStrategy
override def postStop(): Unit = {
log.info("Ordered graceful shutdown of children starting")
ordered foreach (gracefulShutdown(_))
log.info("Ordered graceful shutdown of children completed")
}
/**
* Executes [[akka.pattern.gracefulStop( )]] on `child`.
* Blocks the thread for orderly shutdown.
*/
def gracefulShutdown(child: ActorRef): Unit = try {
log.debug("Graceful stop starting for {}", child.path)
Await.result(gracefulStop(child, settings.GracefulShutdownDuration), settings.GracefulShutdownDuration + 1.seconds)
log.debug("Graceful stop completed for {}", child.path)
} catch { case e =>
log.error("Error shutting down {}, cause {}", child.path, cause.toString)
throw new GracefulShutdownException(cause)
}
}
/**
* Handles load-time provisioning
*/
trait ProvisioningGuardian extends OrderedGracefulShutdown with Supervision {
import context._
def selfAddress: Address
override def preStart(): Unit = self ! InternalCoreAction.Provision
/**
* Executes graceful shutdown by dependency hierarchy.
*/
override def postStop(): Unit = {
log.info("Node stopping on [{}]", selfAddress)
self ! InternalCoreAction.Deprovision
super.postStop()
}
override def preRestart(cause: Throwable, message: Option[Any]): Unit = {
log.error("Node failure on {} due to {} : {}", selfAddress, cause, message)
super.preRestart(cause, message)
}
def provisioning: Receive = {
case InternalCoreAction.Provisioned => unbecome()
}
def receive: Receive = {
case InternalCoreAction.Provision =>
log.info("Provisioning on {}", selfAddress)
become(provisioning); provision(sender) // work in progress to be just: become(provisioning)
case InternalCoreAction.Deprovision =>
log.info("Deprovisioning on {}", selfAddress)
deprovision(sender)
}
/**
* Blocks the thread in load-time for sequential provisioning.
*/
def provision(sender: ActorRef): Unit
/**
* Executes graceful shutdown by dependency hierarchy.
*/
def deprovision(sender: ActorRef): Unit = {
ordered foreach (gracefulShutdown(_))
log.info("Deprovisioned on {}", selfAddress)
sender ! InternalCoreAction.DeprovisionedAck(self)
self ! PoisonPill
context.system.shutdown()
}
}
@viktorklang
Copy link

Why Await?

@helena
Copy link
Author

helena commented Aug 13, 2013

The Await is during load-time and shutdown for ordered actor provisioning/deprovisioning. For example, where all role actors (non-framework) need to use services such as naming and dynamic locations, where actor1 above is naming, actor2 is locations. Naming and Locations actors have workers, all must be up and running before any non-framework role actors come online and start their work. As each supervisor completes provisioning they send back the ack. Then all role actors can start.

Await only occurs during node startup/shutdown.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment