Skip to content

Instantly share code, notes, and snippets.

@krasserm
Created February 17, 2011 07:23
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save krasserm/831220 to your computer and use it in GitHub Desktop.
Save krasserm/831220 to your computer and use it in GitHub Desktop.
// ###########################################################
//
// Demonstrates how to supervise an Akka consumer actor.
//
// The consumer consumes messages from a file endpoint:
// - successful message processing by the consumer will
// positively acknowledge the message receipt, causing
// the file endpoint to delete the file.
// - an exception during message processing will cause a
// supervisor to restart the consumer. Before restart,
// the consumer negatively acknowledges the message
// receipt which causes the file endpoint to redeliver
// the message.
//
// This example requires Akka 1.1-SNAPSHOT to run.
//
// The usage pattern shown here is not limited to file
// endpoints but can be used for any other Camel endpoints.
//
// ###########################################################
// -----------------------------------------------------------
// Main: start CamelService, FileConsumer and Supervisor
// -----------------------------------------------------------
import akka.actor._
import akka.camel._
import akka.config.Supervision._
CamelServiceManager.startCamelService
// Mock repository (throws exception on first use of save method)
val repository = new Repository
// Supervised file consumer
val consumer = Actor.actorOf(new SupervisedFileConsumer(repository))
// Supervisor
val supervisor = Supervisor(
SupervisorConfig(
AllForOneStrategy(List(classOf[Exception]), 5, 10000),
Supervise(consumer, Permanent) :: Nil))
}
}
// -----------------------------------------------------------
// File consumer actor and mock repository
// -----------------------------------------------------------
class SupervisedFileConsumer(repo: Repository) extends Actor with Consumer {
// let this actor positively or negatively acknowledge the message receipt
override def autoack = false
// read file from data/input/actor directory and delete
// file once processing has been positively acknowledged
def endpointUri = "file:data/input/actor?delete=true"
def receive = manageReplyChannelFor {
case msg: Message => {
// will throw exception on first attempt
repo.save(msg)
// positively acknowledge receipt of message
for (c <- replyChannel) c ! Ack
}
}
override def preRestart(reason: scala.Throwable) {
// replyChannel only set if receive function
// terminated abnormally i.e. with an exception
// will send reply only if reply channel is set
for (c <- replyChannel) c ! Failure(reason)
}
override def postStop() {
// replyChannel only set if receive function
// terminated abnormally i.e. with an exception
// will send reply only if reply channel is set
for (c <- replyChannel) c ! Failure(new Exception("actor stopped by supervisor"))
}
}
class Repository {
var firstRejected = false
// Throws exception on first call
def save(content: Any) {
if (!firstRejected) {
firstRejected = true
throw new Exception("save failed")
}
println("save succeeded")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment