Last active
August 29, 2015 13:58
-
-
Save sstone/9951220 to your computer and use it in GitHub Desktop.
handling consumer cancellation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.github.sstone.amqp | |
import akka.actor.ActorDSL._ | |
import akka.actor.{Props, Actor, ActorSystem} | |
import com.github.sstone.amqp.Amqp._ | |
import com.github.sstone.amqp.ChannelOwner.Connected | |
import com.rabbitmq.client.ConnectionFactory | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import scala.concurrent.duration._ | |
object Consumer5 extends App { | |
implicit val system = ActorSystem("mySystem") | |
// create an AMQP connection | |
val connFactory = new ConnectionFactory() | |
connFactory.setUri("amqp://guest:guest@localhost/%2F") | |
val conn = system.actorOf(ConnectionOwner.props(connFactory, 1 second)) | |
val producer = ConnectionOwner.createChildActor(conn, ChannelOwner.props()) | |
class Root extends Actor { | |
val exchange = ExchangeParameters(name = "amq.direct", exchangeType = "direct", passive = true) | |
val queue = QueueParameters(name = "my_queue", passive = false, durable = false, autodelete = false) | |
val consumer = ConnectionOwner.createChildActor(conn, Consumer.props(Some(self))) | |
consumer ! AddStatusListener(self) | |
def receive = disconnected | |
def disconnected: Receive = { | |
case Connected => | |
consumer ! AddBinding(Binding(exchange, queue, "my_key")) | |
context.become(connected) | |
} | |
def connected: Receive = { | |
case Delivery(consumerTag, enveloppe, properties, body) => | |
println(s"received message") | |
sender ! Ack(enveloppe.getDeliveryTag) | |
case ConsumerCancelled(consumerTag) => | |
println(s"consumer $consumerTag was cancelled") | |
consumer ! AddBinding(Binding(exchange, queue, "my_key")) | |
} | |
} | |
val root = system.actorOf(Props[Root]) | |
system.scheduler.schedule(500 milliseconds, 1 second, producer, Publish(exchange = "amq.direct", key = "my_key", body = "test".getBytes("UTF-8"))) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment