Skip to content

Instantly share code, notes, and snippets.

@ptrdom
Created October 18, 2018 12:57
Show Gist options
  • Save ptrdom/1e7d34f421c700c69cc3b429543a32de to your computer and use it in GitHub Desktop.
Save ptrdom/1e7d34f421c700c69cc3b429543a32de to your computer and use it in GitHub Desktop.
RabbitMQ Java Client AutorecoveringDirectReplyClient example
package utilities.rabbitmq;
import com.rabbitmq.client.*;
import com.rabbitmq.utility.BlockingCell;
import net.jodah.lyra.config.Config;
import net.jodah.lyra.event.DefaultChannelListener;
import java.io.EOFException;
import java.io.IOException;
import java.util.Map;
public class AutorecoveringDirectReplyClient extends RpcClient {
private boolean channelAvailable;
public AutorecoveringDirectReplyClient(Channel channel, String exchange, String routingKey) throws IOException {
this(channel, exchange, routingKey, NO_TIMEOUT);
}
public AutorecoveringDirectReplyClient(Channel channel, String exchange, String routingKey, int timeout) throws IOException {
super(channel, exchange, routingKey, timeout);
setupAutorecovery();
}
@Override
public void checkConsumer() throws IOException {
if (!channelAvailable) {
throw new EOFException("RpcClient is closed");
}
}
@Override
protected DefaultConsumer setupConsumer() throws IOException {
DefaultConsumer consumer = new DefaultConsumer(getChannel()) {
@Override
public void handleShutdownSignal(
String consumerTag,
ShutdownSignalException signal
) {
synchronized (getContinuationMap()) {
for (Map.Entry<String, BlockingCell<Object>> entry : getContinuationMap().entrySet()) {
entry.getValue().set(signal);
}
getContinuationMap().clear();
channelAvailable = false;
}
}
@Override
public void handleDelivery(
String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body
) {
synchronized (getContinuationMap()) {
String replyId = properties.getCorrelationId();
BlockingCell<Object> blocker =getContinuationMap().remove(replyId);
if (blocker == null) {
throw new IllegalStateException("No outstanding request for correlation ID " + replyId);
}
blocker.set(new Response(consumerTag, envelope, properties, body));
}
}
};
getChannel().basicConsume("amq.rabbitmq.reply-to", true, consumer);
channelAvailable = true;
return consumer;
}
private void setupAutorecovery(){
Config.of(getChannel()).withChannelListeners(new DefaultChannelListener() {
@Override
public void onRecoveryCompleted(Channel channel) {
channelAvailable = true;
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment