Skip to content

Instantly share code, notes, and snippets.

@drexin
Created May 22, 2013 08:26
Show Gist options
  • Save drexin/5626071 to your computer and use it in GitHub Desktop.
Save drexin/5626071 to your computer and use it in GitHub Desktop.
// This actor receives AMQP messages from the connection actor
// and forwards them to the receiver (in my case a channel)
class Forwarder(receiver: ActorRef) extends Actor {
def receive = {
case AMQPMessage(deliveryTag, key, data) =>
val connectionActor = sender
receiver ! Message(
event = data,
ack = true,
// here I expect the Ack(deliveryTag) to be sent
// to the connectionActor, after the channel
// persisted it
posConfirmationTarget = connectionActor,
posConfirmationMessage = Ack(deliveryTag)
)
}
}
// connectionActor manages the connection to the amqp broker and forwards messages
// to the subscriber and waits for acks
val connectionActor = system.actorOf(Props(classOf[ConnectionActor], amqpConfig))
val processor = system.actorOf(Props[Processor])
val channel = esExtension.channelOf(ReliableChannelProps(1, processor))
// forwarder will forward messages to the channel
val forwarder = system.actorOf(Props(classOf[Forwarder], channel))
connectionActor ! Subscribe(forwarder)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment