Skip to content

Instantly share code, notes, and snippets.

Created June 29, 2017 06:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/1efbe25819b98d4864cd5c5d36fc2a98 to your computer and use it in GitHub Desktop.
Save anonymous/1efbe25819b98d4864cd5c5d36fc2a98 to your computer and use it in GitHub Desktop.
the description for this gist
class ExchangeConsumer(connection: Connection, exchange: String)
extends Consumer[OutboundMessage, Unit] {
override def createSubscriber(
cb: Callback[Unit],
s: Scheduler
): (Subscriber[OutboundMessage], AssignableCancelable) = {
val ch = connection.createChannel() // (1)
val subscriber =
new Subscriber[OutboundMessage] {
implicit val scheduler: Scheduler = s // (2)
private val properties = new AMQP.BasicProperties() // (3)
private def publish(m: OutboundMessage): Unit =
ch.basicPublish(exchange, m.routingKey, properties, m.body) // (4)
override def onNext(m: OutboundMessage): Future[Ack] =
Future {
blocking(publish(m))
Continue // (5)
}
override def onError(ex: Throwable): Unit = {
abort()
cb.onError(ex) // (6)
}
override def onComplete(): Unit = {
abort()
cb.onSuccess(()) // (6)
}
//Checks if is open to mitigate risk of killing connection
private def abort(): Unit = if (ch.isOpen) ch.abort()
}
(subscriber, AssignableCancelable.dummy) // (7)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment