Created
October 18, 2018 12:57
-
-
Save ptrdom/1e7d34f421c700c69cc3b429543a32de to your computer and use it in GitHub Desktop.
RabbitMQ Java Client AutorecoveringDirectReplyClient example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package utilities.rabbitmq; | |
import com.rabbitmq.client.*; | |
import com.rabbitmq.utility.BlockingCell; | |
import net.jodah.lyra.config.Config; | |
import net.jodah.lyra.event.DefaultChannelListener; | |
import java.io.EOFException; | |
import java.io.IOException; | |
import java.util.Map; | |
public class AutorecoveringDirectReplyClient extends RpcClient { | |
private boolean channelAvailable; | |
public AutorecoveringDirectReplyClient(Channel channel, String exchange, String routingKey) throws IOException { | |
this(channel, exchange, routingKey, NO_TIMEOUT); | |
} | |
public AutorecoveringDirectReplyClient(Channel channel, String exchange, String routingKey, int timeout) throws IOException { | |
super(channel, exchange, routingKey, timeout); | |
setupAutorecovery(); | |
} | |
@Override | |
public void checkConsumer() throws IOException { | |
if (!channelAvailable) { | |
throw new EOFException("RpcClient is closed"); | |
} | |
} | |
@Override | |
protected DefaultConsumer setupConsumer() throws IOException { | |
DefaultConsumer consumer = new DefaultConsumer(getChannel()) { | |
@Override | |
public void handleShutdownSignal( | |
String consumerTag, | |
ShutdownSignalException signal | |
) { | |
synchronized (getContinuationMap()) { | |
for (Map.Entry<String, BlockingCell<Object>> entry : getContinuationMap().entrySet()) { | |
entry.getValue().set(signal); | |
} | |
getContinuationMap().clear(); | |
channelAvailable = false; | |
} | |
} | |
@Override | |
public void handleDelivery( | |
String consumerTag, | |
Envelope envelope, | |
AMQP.BasicProperties properties, | |
byte[] body | |
) { | |
synchronized (getContinuationMap()) { | |
String replyId = properties.getCorrelationId(); | |
BlockingCell<Object> blocker =getContinuationMap().remove(replyId); | |
if (blocker == null) { | |
throw new IllegalStateException("No outstanding request for correlation ID " + replyId); | |
} | |
blocker.set(new Response(consumerTag, envelope, properties, body)); | |
} | |
} | |
}; | |
getChannel().basicConsume("amq.rabbitmq.reply-to", true, consumer); | |
channelAvailable = true; | |
return consumer; | |
} | |
private void setupAutorecovery(){ | |
Config.of(getChannel()).withChannelListeners(new DefaultChannelListener() { | |
@Override | |
public void onRecoveryCompleted(Channel channel) { | |
channelAvailable = true; | |
} | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment