Created
February 19, 2011 14:14
-
-
Save krasserm/835076 to your computer and use it in GitHub Desktop.
Akka consumer actors best practices
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class BoundedMailboxDirectConsumer extends Actor with Consumer { | |
import akka.dispatch._ | |
import akka.util.duration._ | |
// Create a bounded mailbox for this consumer actor with a capacity of 10 | |
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self, 5, 100.milliseconds) | |
// instruct how the route should be customized during route creation (definition) | |
onRouteDefinition { rd: RouteDefinition => | |
// on exception attempt max 3 redeliveries with a delay of 1000 ms | |
rd.onException(classOf[Exception]).maximumRedeliveries(3).redeliveryDelay(1000).end | |
} | |
def endpointUri = "direct:input" | |
def receive = { | |
case msg => { | |
Thread.sleep(1000) // delay processing | |
println("received %s" format msg) | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class BoundedMailboxFileConsumer extends Actor with Consumer { | |
import akka.dispatch._ | |
import akka.util.duration._ | |
// Create a bounded mailbox for this consumer actor with a capacity of 10 | |
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self, 5, 100.milliseconds) | |
def endpointUri = "file:data/input?delete=true" | |
def receive = { | |
case msg => { | |
Thread.sleep(1000) // delay processing | |
println("received %s" format msg) } | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class FileConsumer extends Actor with Consumer { | |
// let this actor positively or negatively acknowledge the message receipt | |
override def autoack = false | |
// let Camel wait (block a thread) until a response is sent or an exception | |
// is thrown by receive | |
override def blocking = true | |
// read file from data/input/actor directory and delete | |
// file once processing has been positively acknowledged | |
def endpointUri = "file:data/input/actor?delete=true" | |
def receive = { | |
case msg: Message => { | |
// do something that may throw an exception | |
// ... | |
// positively acknowledge receipt of message | |
self.reply(Ack) // positively acknowledge receipt of message | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class InOnlyAckConsumer extends Actor with Consumer { | |
override def autoack = false | |
def endpointUri = "file:data/input?delete=true" | |
def receive = { | |
case msg: Message => { | |
println("received %s" format msg.bodyAs[String]) | |
self.reply(Ack) | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class InOnlyConsumer extends Actor with Consumer { | |
def endpointUri = "file:data/input?delete=true" | |
def receive = { | |
case msg: Message => { | |
println("received %s" format msg.bodyAs[String]) | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor._ | |
import akka.camel._ | |
class InOutConsumer extends Actor with Consumer { | |
def endpointUri = "mina:tcp://localhost:6200?textline=true" | |
def receive = { | |
case Message(body, _) => { | |
self.reply("received %s" format body) | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
for { | |
service <- CamelServiceManager.service | |
template <- CamelContextManager.template | |
} { | |
// await activation of 1 endpoint | |
service.awaitEndpointActivation(1) { | |
Actor.actorOf[InOutConsumer].start | |
} | |
// use a Camel producer template to send a test | |
// message via tcp and wait for a response | |
val uri = "mina:tcp://localhost:6200?textline=true" | |
assert(template.requestBody(uri, "test") == "received test") | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// initiate 10 in-only message exchanges with the direct:input endpoint | |
for (i <- 1 to 10) template.sendBody("direct:input", "%s" format i) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CamelServiceManager.startCamelService |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SupervisedFileConsumer extends Actor with Consumer with ChannelManagement { | |
override def autoack = false | |
def endpointUri = "file:data/input/actor?delete=true" | |
// manage a replyChannel for a given receive partial function | |
def receive = manageReplyChannelFor { | |
case msg: Message => { | |
// do something that may throw an exception | |
// ... | |
// positively acknowledge receipt of message | |
for (c <- replyChannel) c ! Ack | |
} | |
} | |
override def preRestart(reason: scala.Throwable) { | |
// replyChannel only available if receive function | |
// terminated abnormally i.e. with an exception | |
for (c <- replyChannel) c ! Failure(reason) | |
} | |
override def postStop() { | |
// replyChannel only available if receive function | |
// terminated abnormally i.e. with an exception | |
for (c <- replyChannel) c ! Ack // or reply with a Failure to | |
// keep the corresponding file | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment