Created
April 13, 2017 13:07
-
-
Save jongyeol/35ae23fee87e0426fef5faf5c417cc44 to your computer and use it in GitHub Desktop.
Lettuce-disconnectedWriteTimeoutExceptionAndReconnectTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Test | |
public void disconnectedWriteTimeoutExceptionAndReconnectTest() throws Exception { | |
AtomicBoolean networkIsStable = new AtomicBoolean(true); | |
Delay reconnectDelay = Delay.exponential(1, 100, TimeUnit.MILLISECONDS, 2); | |
NettyCustomizer nettyCustomizer = new NettyCustomizer() { | |
@Override | |
public void afterBootstrapInitialized(Bootstrap bootstrap) {} | |
@Override | |
public void afterChannelInitialized(Channel channel) { | |
String targetHandlerName = null; | |
for (Entry<String, ChannelHandler> entry : channel.pipeline()) { | |
if (entry.getValue() instanceof CommandEncoder) { | |
targetHandlerName = entry.getKey(); | |
} | |
} | |
channel.pipeline().addAfter(targetHandlerName, "WriteTimeoutHandler", | |
new ReusableWriteTimeoutHandler(100, TimeUnit.MILLISECONDS)); | |
} | |
}; | |
ClientResources resources = DefaultClientResources.builder() | |
.reconnectDelay(reconnectDelay) | |
.nettyCustomizer(nettyCustomizer) | |
.build(); | |
RedisURI redisUri = Builder.redis(TestSettings.host(), TestSettings.port()).build(); | |
RedisClient client = RedisClient.create(resources, redisUri); | |
try { | |
ClientOptions clientOptions = | |
ClientOptions.builder() | |
.autoReconnect(true) | |
.disconnectedBehavior(DisconnectedBehavior.ACCEPT_COMMANDS) | |
.build(); | |
client.setOptions(clientOptions); | |
StatefulRedisConnection<String, String> redisConnection = client.connect(); | |
RedisAsyncCommands<String, String> connection = redisConnection.async(); | |
Channel channel = getChannel(redisConnection); | |
channel.pipeline().addFirst(new ChannelOutboundHandlerAdapter() { | |
@Override | |
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) | |
throws Exception { | |
if (networkIsStable.get()) { | |
ctx.write(msg, promise); | |
} | |
} | |
}); | |
RedisFuture<String> future1 = connection.get("key1"); | |
future1.get(2, TimeUnit.SECONDS); | |
// To trigger WriteTimeoutException to simulate network is unstable. | |
networkIsStable.set(false); | |
RedisFuture<String> future2 = connection.get("key2"); | |
try { | |
future2.get(1, TimeUnit.SECONDS); | |
fail("Exception was not occured."); | |
} catch (ExecutionException e) { | |
if (!(e.getCause() instanceof WriteTimeoutException)) { | |
fail("Unexpected exception: " + e.getCause()); | |
} | |
} | |
Thread.sleep(1_000); // wait for WatchDog's reconnection. | |
assertThat(connection.isOpen()).isTrue(); | |
} finally { | |
client.shutdown(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment