Skip to content

Instantly share code, notes, and snippets.

@krasserm
Created February 28, 2011 06:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save krasserm/847012 to your computer and use it in GitHub Desktop.
Save krasserm/847012 to your computer and use it in GitHub Desktop.
Akka producer actors best practices - Part 2
httpProducer ! Message("test", Map(Message.MessageExchangeId -> 123))
class JmsForwardingProducer(target: ActorRef) extends Actor with Producer {
def endpointUri = "jms:queue:test"
override def oneway = true
override def receiveAfterProduce = {
case msg => target forward msg
}
}
class ForwardReceiver extends Actor {
def receive = {
case message: Message => self.reply("done")
case failure: Failure => self.reply("message could not be added to queue: %s" format failure.cause.getMessage)
}
}
val receiver = Actor.actorOf[ForwardReceiver].start
val producer = Actor.actorOf(new JmsForwardingProducer(receiver)).start
producer !! "test" // gets a response from ForwardReceiver
someProducer !! "message"
class SomeProducer extends Actor with Producer {
// ...
override def receiveAfterProduce = {
case Failure(cause, _) => for (sf <- self.senderFuture) sf.completeWithException(cause)
case message => self.reply(message)
}
}
class SupervisedFileConsumer extends Actor with Consumer {
override def autoack = false
def endpointUri = "file:data/input/actor?delete=true"
def receive = {
case msg: Message => {
// do something that may throw an exception
// ...
// positively acknowledge receipt of message
self.reply(Ack)
}
}
override def preRestart(reason: scala.Throwable) {
self.reply_?(Failure(reason))
}
override def postStop() {
self.reply_?(Ack) // or reply with a Failure to
// keep the corresponding file
}
}
class SupervisedProducer extends Actor with Producer with ChannelManagement {
def endpointUri = ...
// manage a replyChannel for the default receive implementation
override def receive = manageReplyChannelFor(super.receive)
override def receiveAfterProduce = {
// exception thrown here on failure, for example
// ...
}
override def preRestartProducer(reason: Throwable) {
for (c <- replyChannel) c ! Failure(reason)
}
override def postStop() {
for (c <- replyChannel) c ! Failure(new Exception("actor stopped by supervisor"))
}
}
class SupervisedProducer extends Actor with Producer {
def endpointUri = ...
override def receiveAfterProduce = {
// exception thrown here on failure, for example
// ...
}
override def preRestartProducer(reason: Throwable) {
self.reply_?(Failure(reason))
}
override def postStop() {
self.reply_?(Failure(new Exception("actor stopped by supervisor")))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment