public void disconnectedWriteTimeoutExceptionAndReconnectTest() throws Exception {
AtomicBoolean networkIsStable = new AtomicBoolean(true);
Delay reconnectDelay = Delay.exponential(1, 100, TimeUnit.MILLISECONDS, 2);
NettyCustomizer nettyCustomizer = new NettyCustomizer() {
public void afterBootstrapInitialized(Bootstrap bootstrap) {}
public void afterChannelInitialized(Channel channel) {
String targetHandlerName = null;
for (Entry<String, ChannelHandler> entry : channel.pipeline()) {
if (entry.getValue() instanceof CommandEncoder) {
targetHandlerName = entry.getKey();
channel.pipeline().addAfter(targetHandlerName, "WriteTimeoutHandler",
new ReusableWriteTimeoutHandler(100, TimeUnit.MILLISECONDS));
ClientResources resources = DefaultClientResources.builder()
RedisURI redisUri = Builder.redis(, TestSettings.port()).build();
RedisClient client = RedisClient.create(resources, redisUri);
try {
ClientOptions clientOptions =
StatefulRedisConnection<String, String> redisConnection = client.connect();
RedisAsyncCommands<String, String> connection = redisConnection.async();
Channel channel = getChannel(redisConnection);
channel.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
if (networkIsStable.get()) {
ctx.write(msg, promise);
RedisFuture<String> future1 = connection.get("key1");
future1.get(2, TimeUnit.SECONDS);
// To trigger WriteTimeoutException to simulate network is unstable.
RedisFuture<String> future2 = connection.get("key2");
try {
future2.get(1, TimeUnit.SECONDS);
fail("Exception was not occured.");
} catch (ExecutionException e) {
if (!(e.getCause() instanceof WriteTimeoutException)) {
fail("Unexpected exception: " + e.getCause());
Thread.sleep(1_000); // wait for WatchDog's reconnection.
} finally {
