Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save jongyeol/35ae23fee87e0426fef5faf5c417cc44 to your computer and use it in GitHub Desktop.
Save jongyeol/35ae23fee87e0426fef5faf5c417cc44 to your computer and use it in GitHub Desktop.
Lettuce-disconnectedWriteTimeoutExceptionAndReconnectTest.java
@Test
public void disconnectedWriteTimeoutExceptionAndReconnectTest() throws Exception {
AtomicBoolean networkIsStable = new AtomicBoolean(true);
Delay reconnectDelay = Delay.exponential(1, 100, TimeUnit.MILLISECONDS, 2);
NettyCustomizer nettyCustomizer = new NettyCustomizer() {
@Override
public void afterBootstrapInitialized(Bootstrap bootstrap) {}
@Override
public void afterChannelInitialized(Channel channel) {
String targetHandlerName = null;
for (Entry<String, ChannelHandler> entry : channel.pipeline()) {
if (entry.getValue() instanceof CommandEncoder) {
targetHandlerName = entry.getKey();
}
}
channel.pipeline().addAfter(targetHandlerName, "WriteTimeoutHandler",
new ReusableWriteTimeoutHandler(100, TimeUnit.MILLISECONDS));
}
};
ClientResources resources = DefaultClientResources.builder()
.reconnectDelay(reconnectDelay)
.nettyCustomizer(nettyCustomizer)
.build();
RedisURI redisUri = Builder.redis(TestSettings.host(), TestSettings.port()).build();
RedisClient client = RedisClient.create(resources, redisUri);
try {
ClientOptions clientOptions =
ClientOptions.builder()
.autoReconnect(true)
.disconnectedBehavior(DisconnectedBehavior.ACCEPT_COMMANDS)
.build();
client.setOptions(clientOptions);
StatefulRedisConnection<String, String> redisConnection = client.connect();
RedisAsyncCommands<String, String> connection = redisConnection.async();
Channel channel = getChannel(redisConnection);
channel.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
if (networkIsStable.get()) {
ctx.write(msg, promise);
}
}
});
RedisFuture<String> future1 = connection.get("key1");
future1.get(2, TimeUnit.SECONDS);
// To trigger WriteTimeoutException to simulate network is unstable.
networkIsStable.set(false);
RedisFuture<String> future2 = connection.get("key2");
try {
future2.get(1, TimeUnit.SECONDS);
fail("Exception was not occured.");
} catch (ExecutionException e) {
if (!(e.getCause() instanceof WriteTimeoutException)) {
fail("Unexpected exception: " + e.getCause());
}
}
Thread.sleep(1_000); // wait for WatchDog's reconnection.
assertThat(connection.isOpen()).isTrue();
} finally {
client.shutdown();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment