Skip to content

Instantly share code, notes, and snippets.

@sstone
Created July 29, 2014 10:23
Show Gist options
  • Save sstone/b1a95a7fe8ed65483ed1 to your computer and use it in GitHub Desktop.
Save sstone/b1a95a7fe8ed65483ed1 to your computer and use it in GitHub Desktop.
amqp-client/issue #55/producer
package com.github.sstone.amqp.bug55
import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import com.github.sstone.amqp.Amqp._
import com.github.sstone.amqp.{Amqp, ChannelOwner, ConnectionOwner}
import com.rabbitmq.client.{AMQP, ConnectionFactory}
import scala.concurrent.duration._
object MyProducer extends App {
implicit val system = ActorSystem("mysystem")
val connFactory = new ConnectionFactory()
connFactory.setUri("amqp://guest:guest@localhost/%2F")
val conn = system.actorOf(ConnectionOwner.props(connFactory, 5 second))
val channel = ConnectionOwner.createChildActor(conn, ChannelOwner.props())
Amqp.waitForConnection(system, channel).await()
class Producer extends Actor with ActorLogging {
channel ! ConfirmSelect
channel ! AddReturnListener(self)
channel ! AddConfirmListener(self)
override def unhandled(message: Any): Unit = message match {
case Amqp.Ok(_, _) => ()
case _ => log.warning(s"unhandled $message")
}
def receive = {
case message: String => {
val props = new AMQP.BasicProperties.Builder().contentType("foo").contentEncoding("text").replyTo("").build()
channel ! Publish("amq.direct", "my_key", message.getBytes("UTF-8"), properties = Some(props), mandatory = true, immediate = false)
channel ! WaitForConfirms(None)
}
case Amqp.Ok(WaitForConfirms(_), _) => self ! "test"
case HandleAck(_, _) => ()
}
}
val producer = system.actorOf(Props[Producer], "Producer")
producer ! "test"
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment