Skip to content

Instantly share code, notes, and snippets.

@pauljm
Last active December 25, 2015 17:49
Show Gist options
  • Save pauljm/7015546 to your computer and use it in GitHub Desktop.
Save pauljm/7015546 to your computer and use it in GitHub Desktop.
Actor system example
/** Reads queue of new users to index */
class NewUserQueue extends Actor with ActorLogging { ... }
object NewUserQueue {
case class Listen(ref: ActorRef)
case class NewUsers(users: Seq[User])
}
/** Indexes documents for a new user */
class UserIndexer extends Actor with ActorLogging { ... }
object UserIndexer {
case class Start(user: User)
case class Success(user: User)
case class Failure(user: User)
}
/** Updates indexed timestamp and writes user to persistent storage */
class UserPersister extends Actor with ActorLogging { ... }
object UserPersister {
case class UpdateTimestampAndWrite(user: User)
case class Success(user: User)
case class Failure(user: User)
}
/** Coordinates indexing of all new users */
class UserPipeline(
val queue: ActorRef // This is an actor that produces a stream of users to be processed
) extends Actor with ActorLogging {
// Persister to write users to DB
val persister = context.actorOf(Props[UserPersister])
// Map of lightweight, single-use indexer actors
val indexers = Map.empty[User, ActorRef]
// Handle potential failures in our child actors
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 1 minute) {
case _: java.sql.SQLException => Resume
case _: NullPointerException => Restart
case _: Exception => Escalate
}
// Tell the queue to send new users our way
queue ! NewUserQueue.Listen(self)
def receive = {
// We've got users to process!
case NewUserQueue.NewUsers(users: Seq[User]) =>
users.foreach { user =>
val indexer = context.actorOf(Props[UserIndexer]) // Create a worker actor for each user
context.watch(indexer) // Watch the new worker for failure handling
indexers += user -> worker // Add the worker to our map
indexer ! UserIndexer.Start(user) // Tell the new worker to start working on this user
}
// UserIndexer succeeded. Update timestamp and write to DB.
case UserIndexer.Success(user) =>
context.unwatch(sender) // Stop watching this actor
context.stop(sender) // Stop the actor
indexers -= user // Clear from map
persister ! UserPersister.UpdateTimestampAndWrite(user)
// Persister succeeded. We're done!
case UserPersister.Success(user) =>
log.info("Done!")
// UserIndexer failed
case UserIndexer.Failure(user) =>
context.unwatch(sender) // stop watching this actor
context.stop(sender) // Stop the actor
indexers -= user
log.error("Indexer failed")
// Persister failed
case UserPersister.Failure(user) =>
log.error("Persist failed")
// Terminated messages are generated when an actor dies and sent to any actor watching the dead actor
case Terminated(deadActor) =>
// handle an unexpected failure in the pipeline
indexers.find(_._2 == deadActor) foreach { case (user, _) =>
indexers -= user
log.error("Worker died processing user {}", user)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment