Created
February 28, 2011 06:36
-
-
Save krasserm/847012 to your computer and use it in GitHub Desktop.
Akka producer actors best practices - Part 2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
httpProducer ! Message("test", Map(Message.MessageExchangeId -> 123)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class JmsForwardingProducer(target: ActorRef) extends Actor with Producer { | |
def endpointUri = "jms:queue:test" | |
override def oneway = true | |
override def receiveAfterProduce = { | |
case msg => target forward msg | |
} | |
} | |
class ForwardReceiver extends Actor { | |
def receive = { | |
case message: Message => self.reply("done") | |
case failure: Failure => self.reply("message could not be added to queue: %s" format failure.cause.getMessage) | |
} | |
} | |
val receiver = Actor.actorOf[ForwardReceiver].start | |
val producer = Actor.actorOf(new JmsForwardingProducer(receiver)).start | |
producer !! "test" // gets a response from ForwardReceiver |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
someProducer !! "message" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SomeProducer extends Actor with Producer { | |
// ... | |
override def receiveAfterProduce = { | |
case Failure(cause, _) => for (sf <- self.senderFuture) sf.completeWithException(cause) | |
case message => self.reply(message) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SupervisedFileConsumer extends Actor with Consumer { | |
override def autoack = false | |
def endpointUri = "file:data/input/actor?delete=true" | |
def receive = { | |
case msg: Message => { | |
// do something that may throw an exception | |
// ... | |
// positively acknowledge receipt of message | |
self.reply(Ack) | |
} | |
} | |
override def preRestart(reason: scala.Throwable) { | |
self.reply_?(Failure(reason)) | |
} | |
override def postStop() { | |
self.reply_?(Ack) // or reply with a Failure to | |
// keep the corresponding file | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SupervisedProducer extends Actor with Producer with ChannelManagement { | |
def endpointUri = ... | |
// manage a replyChannel for the default receive implementation | |
override def receive = manageReplyChannelFor(super.receive) | |
override def receiveAfterProduce = { | |
// exception thrown here on failure, for example | |
// ... | |
} | |
override def preRestartProducer(reason: Throwable) { | |
for (c <- replyChannel) c ! Failure(reason) | |
} | |
override def postStop() { | |
for (c <- replyChannel) c ! Failure(new Exception("actor stopped by supervisor")) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SupervisedProducer extends Actor with Producer { | |
def endpointUri = ... | |
override def receiveAfterProduce = { | |
// exception thrown here on failure, for example | |
// ... | |
} | |
override def preRestartProducer(reason: Throwable) { | |
self.reply_?(Failure(reason)) | |
} | |
override def postStop() { | |
self.reply_?(Failure(new Exception("actor stopped by supervisor"))) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment