Skip to content

Instantly share code, notes, and snippets.

@ryantanner
Forked from pauljm/actor_example.scala
Last active November 27, 2017 00:02
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save ryantanner/7015577 to your computer and use it in GitHub Desktop.
Save ryantanner/7015577 to your computer and use it in GitHub Desktop.
/** Reads queue of new users to index */
class NewUserQueue extends Actor with ActorLogging { ... }
// Define our messages for the NewUserQueue
object NewUserQueue {
case class Listen(ref: ActorRef)
case class StopListen(ref: ActorRef)
case class NewUsers(users: Seq[User])
}
/** Indexes documents for a new user */
class UserIndexer extends Actor with ActorLogging { ... }
// Define our messages for the UserIndexer
object UserIndexer {
case class Start(user: User)
case class Success(user: User)
case class Failure(user: User)
}
/** Updates indexed timestamp and writes user to persistent storage */
class UserPersister extends Actor with ActorLogging { ... }
// Define our messages for the UserPersister
object UserPersister {
case class UpdateTimestampAndWrite(user: User)
case class Success(user: User)
case class Failure(user: User)
}
/** Coordinates indexing of all new users */
class UserPipeline(
val queue: ActorRef // This is a NewUserQueue actor that produces a stream of users to be processed
) extends Actor with ActorLogging {
// Create a persister actor to write users to DB
val persister = context.actorOf(Props[UserPersister])
// Map of lightweight, single-use indexer actors
val indexers = Map.empty[User, ActorRef]
// Handle potential failures in our child actors
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 1 minute) {
case _: java.sql.SQLException => Resume
case _: NullPointerException => Restart
case _: Exception => Escalate
}
override def preStart = {
// Tell the queue to send new users our way
queue ! NewUserQueue.Listen(self)
}
override def postStop = {
// Tell the queue to stop sending users to us
queue ! NewUserQueue.StopListen(self)
}
def receive = {
// We've got users to process!
case NewUserQueue.NewUsers(users: Seq[User]) =>
users.foreach { user =>
val indexer = context.actorOf(Props[UserIndexer]) // Create a worker actor for each user
context.watch(indexer) // Watch the new worker for failure handling
indexers += user -> indexer // Add the worker to our map
indexer ! UserIndexer.Start(user) // Tell the new worker to start working on this user
}
// UserIndexer succeeded. Update timestamp and write to DB.
case UserIndexer.Success(user) =>
context.unwatch(sender) // Stop watching this actor
context.stop(sender) // Stop the actor
indexers -= user // Clear from map
persister ! UserPersister.UpdateTimestampAndWrite(user) // Tell the database to update the user
// Persister succeeded. We're done!
case UserPersister.Success(user) =>
log.info("Done!")
// UserIndexer failed
case UserIndexer.Failure(user) =>
context.unwatch(sender) // stop watching this actor
context.stop(sender) // Stop the actor
indexers -= user
log.info("Indexer failed")
// Persister failed
case UserPersister.Failure(user) =>
log.info("Persist failed")
// Terminated messages are generated when an actor dies and sent to any actor watching the dead actor
case Terminated(deadActor) =>
// handle an unexpected failure in the pipeline
indexers.find(_._2 == deadActor) foreach { case (user, _) =>
indexers -= user
log.error("Worker died processing user {}", user)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment