Skip to content

Instantly share code, notes, and snippets.

@sstone
Last active August 29, 2015 13:58
Show Gist options
  • Save sstone/9951220 to your computer and use it in GitHub Desktop.
Save sstone/9951220 to your computer and use it in GitHub Desktop.
handling consumer cancellation
package com.github.sstone.amqp
import akka.actor.ActorDSL._
import akka.actor.{Props, Actor, ActorSystem}
import com.github.sstone.amqp.Amqp._
import com.github.sstone.amqp.ChannelOwner.Connected
import com.rabbitmq.client.ConnectionFactory
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
object Consumer5 extends App {
implicit val system = ActorSystem("mySystem")
// create an AMQP connection
val connFactory = new ConnectionFactory()
connFactory.setUri("amqp://guest:guest@localhost/%2F")
val conn = system.actorOf(ConnectionOwner.props(connFactory, 1 second))
val producer = ConnectionOwner.createChildActor(conn, ChannelOwner.props())
class Root extends Actor {
val exchange = ExchangeParameters(name = "amq.direct", exchangeType = "direct", passive = true)
val queue = QueueParameters(name = "my_queue", passive = false, durable = false, autodelete = false)
val consumer = ConnectionOwner.createChildActor(conn, Consumer.props(Some(self)))
consumer ! AddStatusListener(self)
def receive = disconnected
def disconnected: Receive = {
case Connected =>
consumer ! AddBinding(Binding(exchange, queue, "my_key"))
context.become(connected)
}
def connected: Receive = {
case Delivery(consumerTag, enveloppe, properties, body) =>
println(s"received message")
sender ! Ack(enveloppe.getDeliveryTag)
case ConsumerCancelled(consumerTag) =>
println(s"consumer $consumerTag was cancelled")
consumer ! AddBinding(Binding(exchange, queue, "my_key"))
}
}
val root = system.actorOf(Props[Root])
system.scheduler.schedule(500 milliseconds, 1 second, producer, Publish(exchange = "amq.direct", key = "my_key", body = "test".getBytes("UTF-8")))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment