Skip to content

Instantly share code, notes, and snippets.

@smaldini
Last active August 29, 2015 14:08
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save smaldini/d6858b08773e028f5383 to your computer and use it in GitHub Desktop.
Save smaldini/d6858b08773e028f5383 to your computer and use it in GitHub Desktop.
A sendAndReceive RabbitMQ flow with Reactor AMQP support
try {
def lapin = Lapin.from(someConnectionFactory)
//Unique channel/publisher, auto close on Subscription#cancel().
def publisher = LapinStreams.toLapinAndExchange(lapin, someExchangeName)
publisher.dispatchOn(env)
publisher.start()
def replyToStream = publisher
.replyTo(props.replyTo)
.dispatchOn(env)
.take(1)
.log('lapin.sendAndReply')
.timeout(30)
//listen for 1 message, if replyTo is null will generate a temp queue, and will timeout after 30s without reply.
// Publish the message
publisher.onNext(ExchangeSignal.route(message.bytes, someRoutingKey, someExchangeName, props))
// return the tail of the stream to attach Reactive Stream subscriber or keep on adding actions to react on reply
replyToStream
} catch (Exception ioe) {
// connection factory failed so we wrap into an Error Stream.
Streams.fail(ioe)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment