Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
// ###########################################################
//
// Demonstrates how to supervise an Akka consumer actor.
//
// The consumer consumes messages from a file endpoint:
// - successful message processing by the consumer will
// positively acknowledge the message receipt, causing
// the file endpoint to delete the file.
// - an exception during message processing will cause a
// supervisor to restart the consumer. Before restart,
// the consumer negatively acknowledges the message
// receipt which causes the file endpoint to redeliver
// the message.
//
// This example requires Akka 1.1-SNAPSHOT to run.
//
// The usage pattern shown here is not limited to file
// endpoints but can be used for any other Camel endpoints.
//
// ###########################################################
// -----------------------------------------------------------
// Main: start CamelService, FileConsumer and Supervisor
// -----------------------------------------------------------
import akka.actor._
import akka.camel._
import akka.config.Supervision._
CamelServiceManager.startCamelService
// Mock repository (throws exception on first use of save method)
val repository = new Repository
// Supervised file consumer
val consumer = Actor.actorOf(new SupervisedFileConsumer(repository))
// Supervisor
val supervisor = Supervisor(
SupervisorConfig(
AllForOneStrategy(List(classOf[Exception]), 5, 10000),
Supervise(consumer, Permanent) :: Nil))
}
}
// -----------------------------------------------------------
// File consumer actor and mock repository
// -----------------------------------------------------------
class SupervisedFileConsumer(repo: Repository) extends Actor with Consumer {
// let this actor positively or negatively acknowledge the message receipt
override def autoack = false
// read file from data/input/actor directory and delete
// file once processing has been positively acknowledged
def endpointUri = "file:data/input/actor?delete=true"
def receive = {
case msg: Message => {
repo.save(msg) // will throw exception of first attempts
self.reply(Ack) // positively acknowledge receipt of message
// which causes the file endpoint to delete the file
}
}
override def preRestart(reason: scala.Throwable) {
// negatively acknowledge receipt of message
// which causes the file endpoint to redeliver it
self.reply(Failure(reason))
}
}
class Repository {
var firstRejected = false
// Throws exception on first call
def save(content: Any) {
if (!firstRejected) {
firstRejected = true
throw new Exception("save failed")
}
println("save succeeded")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.