Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
A sendAndReceive RabbitMQ flow with Reactor AMQP support
try {
def lapin = Lapin.from(someConnectionFactory)
//Unique channel/publisher, auto close on Subscription#cancel().
def publisher = LapinStreams.toLapinAndExchange(lapin, someExchangeName)
publisher.dispatchOn(env)
publisher.start()
def replyToStream = publisher
.replyTo(props.replyTo)
.dispatchOn(env)
.take(1)
.log('lapin.sendAndReply')
.timeout(30)
//listen for 1 message, if replyTo is null will generate a temp queue, and will timeout after 30s without reply.
// Publish the message
publisher.onNext(ExchangeSignal.route(message.bytes, someRoutingKey, someExchangeName, props))
// return the tail of the stream to attach Reactive Stream subscriber or keep on adding actions to react on reply
replyToStream
} catch (Exception ioe) {
// connection factory failed so we wrap into an Error Stream.
Streams.fail(ioe)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.