Skip to content

Instantly share code, notes, and snippets.

/post.scala

Created Jun 29, 2017
Embed
What would you like to do?
the description for this gist
class ExchangeConsumer(connection: Connection, exchange: String)
extends Consumer[OutboundMessage, Unit] {
override def createSubscriber(
cb: Callback[Unit],
s: Scheduler
): (Subscriber[OutboundMessage], AssignableCancelable) = {
val ch = connection.createChannel() // (1)
val subscriber =
new Subscriber[OutboundMessage] {
implicit val scheduler: Scheduler = s // (2)
private val properties = new AMQP.BasicProperties() // (3)
private def publish(m: OutboundMessage): Unit =
ch.basicPublish(exchange, m.routingKey, properties, m.body) // (4)
override def onNext(m: OutboundMessage): Future[Ack] =
Future {
blocking(publish(m))
Continue // (5)
}
override def onError(ex: Throwable): Unit = {
abort()
cb.onError(ex) // (6)
}
override def onComplete(): Unit = {
abort()
cb.onSuccess(()) // (6)
}
//Checks if is open to mitigate risk of killing connection
private def abort(): Unit = if (ch.isOpen) ch.abort()
}
(subscriber, AssignableCancelable.dummy) // (7)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.