Skip to content

Instantly share code, notes, and snippets.

@sstone
Last active August 29, 2015 14:10
Show Gist options
  • Save sstone/fe77b0e838060a55fe68 to your computer and use it in GitHub Desktop.
Save sstone/fe77b0e838060a55fe68 to your computer and use it in GitHub Desktop.
qos test
import akka.actor.{ActorLogging, Props, Actor, ActorSystem}
import com.github.sstone.amqp.Amqp._
import com.github.sstone.amqp.{ChannelOwner, Consumer, Amqp, ConnectionOwner}
import com.rabbitmq.client.ConnectionFactory
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
object Bug60 extends App {
implicit val system = ActorSystem("mySystem")
val connFactory = new ConnectionFactory()
connFactory.setUri("amqp://guest:guest@localhost/%2F")
val conn = system.actorOf(ConnectionOwner.props(connFactory, 1 second))
class Listener extends Actor with ActorLogging {
var count = 0
def receive = {
case com.github.sstone.amqp.Amqp.Delivery(consumerTag, envelope, properties, body) => {
log.info(s"got a message, tag = ${envelope.getDeliveryTag} body = ${new String(body)}")
if (count % 10 == 1) {
val dest = sender
context.system.scheduler.scheduleOnce(11 seconds, dest, Ack(envelope.getDeliveryTag))
} else sender ! Ack(envelope.getDeliveryTag)
count = count + 1
}
}
}
val listener = system.actorOf(Props[Listener], "listener")
val consumer = ConnectionOwner.createChildActor(conn, Consumer.props(listener, channelParams = Some(ChannelParameters(qos = 2)), autoack = false), name = Some("consumer"))
val producer = ConnectionOwner.createChildActor(conn, ChannelOwner.props(), Some("producer"))
// wait till everyone is actually connected to the broker
Amqp.waitForConnection(system, consumer, producer).await()
val queueParams = QueueParameters("my_queue", passive = false, durable = false, exclusive = false, autodelete = true)
consumer ! AddQueue(queueParams)
system.scheduler.schedule(500 milliseconds, 500 milliseconds, producer, Publish("", "my_queue", "hello!".getBytes))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment