Skip to content

Instantly share code, notes, and snippets.

@sstone
Created February 7, 2016 18:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sstone/cc540f319d11490331c7 to your computer and use it in GitHub Desktop.
Save sstone/cc540f319d11490331c7 to your computer and use it in GitHub Desktop.
use publisher confirms to wait until all messages have been published
package com.github.sstone.amqp.samples
import akka.actor.{ActorLogging, Actor, ActorSystem, Props}
import com.github.sstone.amqp.Amqp._
import com.github.sstone.amqp.{Amqp, ChannelOwner, ConnectionOwner}
import com.rabbitmq.client.{MessageProperties, ConnectionFactory}
import scala.concurrent.duration._
object WaitTillMessagesArePublished extends App {
implicit val system = ActorSystem("mySystem")
// create an AMQP connection
val connFactory = new ConnectionFactory()
connFactory.setUri("amqp://guest:guest@localhost/%2F")
val conn = system.actorOf(ConnectionOwner.props(connFactory, 1 second))
val producer = ConnectionOwner.createChildActor(conn, ChannelOwner.props())
Amqp.waitForConnection(system, producer).await()
class MyActor extends Actor with ActorLogging {
producer ! ConfirmSelect
producer ! AddReturnListener(self)
producer ! DeclareQueue(QueueParameters(name = "my_queue", passive = false, durable = true, autodelete = false))
def receive = {
case ('publish, bodies: Seq[String]) => {
bodies.map(body => producer ! Publish("", "my_queue", body.getBytes, properties = Some(MessageProperties.PERSISTENT_TEXT_PLAIN)))
}
case 'wait => producer ! WaitForConfirms(None)
case Ok(WaitForConfirms(None), Some(true)) =>
log.info("all messages have been published")
system.terminate()
}
}
val myActor = system.actorOf(Props[MyActor])
val bodies = for (i <- 0 until 1000) yield s"body #$i"
myActor ! ('publish, bodies)
myActor ! 'wait
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment