Skip to content

Instantly share code, notes, and snippets.

@krasserm
Created February 19, 2011 14:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save krasserm/835076 to your computer and use it in GitHub Desktop.
Save krasserm/835076 to your computer and use it in GitHub Desktop.
Akka consumer actors best practices
class BoundedMailboxDirectConsumer extends Actor with Consumer {
import akka.dispatch._
import akka.util.duration._
// Create a bounded mailbox for this consumer actor with a capacity of 10
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self, 5, 100.milliseconds)
// instruct how the route should be customized during route creation (definition)
onRouteDefinition { rd: RouteDefinition =>
// on exception attempt max 3 redeliveries with a delay of 1000 ms
rd.onException(classOf[Exception]).maximumRedeliveries(3).redeliveryDelay(1000).end
}
def endpointUri = "direct:input"
def receive = {
case msg => {
Thread.sleep(1000) // delay processing
println("received %s" format msg)
}
}
}
class BoundedMailboxFileConsumer extends Actor with Consumer {
import akka.dispatch._
import akka.util.duration._
// Create a bounded mailbox for this consumer actor with a capacity of 10
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self, 5, 100.milliseconds)
def endpointUri = "file:data/input?delete=true"
def receive = {
case msg => {
Thread.sleep(1000) // delay processing
println("received %s" format msg) }
}
}
class FileConsumer extends Actor with Consumer {
// let this actor positively or negatively acknowledge the message receipt
override def autoack = false
// let Camel wait (block a thread) until a response is sent or an exception
// is thrown by receive
override def blocking = true
// read file from data/input/actor directory and delete
// file once processing has been positively acknowledged
def endpointUri = "file:data/input/actor?delete=true"
def receive = {
case msg: Message => {
// do something that may throw an exception
// ...
// positively acknowledge receipt of message
self.reply(Ack) // positively acknowledge receipt of message
}
}
}
class InOnlyAckConsumer extends Actor with Consumer {
override def autoack = false
def endpointUri = "file:data/input?delete=true"
def receive = {
case msg: Message => {
println("received %s" format msg.bodyAs[String])
self.reply(Ack)
}
}
}
class InOnlyConsumer extends Actor with Consumer {
def endpointUri = "file:data/input?delete=true"
def receive = {
case msg: Message => {
println("received %s" format msg.bodyAs[String])
}
}
}
import akka.actor._
import akka.camel._
class InOutConsumer extends Actor with Consumer {
def endpointUri = "mina:tcp://localhost:6200?textline=true"
def receive = {
case Message(body, _) => {
self.reply("received %s" format body)
}
}
}
for {
service <- CamelServiceManager.service
template <- CamelContextManager.template
} {
// await activation of 1 endpoint
service.awaitEndpointActivation(1) {
Actor.actorOf[InOutConsumer].start
}
// use a Camel producer template to send a test
// message via tcp and wait for a response
val uri = "mina:tcp://localhost:6200?textline=true"
assert(template.requestBody(uri, "test") == "received test")
}
// initiate 10 in-only message exchanges with the direct:input endpoint
for (i <- 1 to 10) template.sendBody("direct:input", "%s" format i)
CamelServiceManager.startCamelService
class SupervisedFileConsumer extends Actor with Consumer with ChannelManagement {
override def autoack = false
def endpointUri = "file:data/input/actor?delete=true"
// manage a replyChannel for a given receive partial function
def receive = manageReplyChannelFor {
case msg: Message => {
// do something that may throw an exception
// ...
// positively acknowledge receipt of message
for (c <- replyChannel) c ! Ack
}
}
override def preRestart(reason: scala.Throwable) {
// replyChannel only available if receive function
// terminated abnormally i.e. with an exception
for (c <- replyChannel) c ! Failure(reason)
}
override def postStop() {
// replyChannel only available if receive function
// terminated abnormally i.e. with an exception
for (c <- replyChannel) c ! Ack // or reply with a Failure to
// keep the corresponding file
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment