Created
August 11, 2015 06:11
-
-
Save mp911de/407632a6b98e6ee36180 to your computer and use it in GitHub Desktop.
Implementing "cancel commands while disconnected" for https://github.com/mp911de/lettuce/issues/115
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import javax.enterprise.inject.Alternative; | |
import com.lambdaworks.redis.RedisClient; | |
import com.lambdaworks.redis.RedisURI; | |
import com.lambdaworks.redis.StatefulRedisConnectionImpl; | |
import com.lambdaworks.redis.codec.RedisCodec; | |
import com.lambdaworks.redis.protocol.CommandHandler; | |
import com.lambdaworks.redis.pubsub.PubSubCommandHandler; | |
import com.lambdaworks.redis.pubsub.StatefulRedisPubSubConnectionImpl; | |
/** | |
* Demo code for extending a RedisClient. | |
* | |
* @author <a href="mailto:mpaluch@paluch.biz">Mark Paluch</a> | |
*/ | |
public class MyExtendedRedisClient extends RedisClient { | |
public MyExtendedRedisClient() { | |
} | |
public MyExtendedRedisClient(String host) { | |
super(host); | |
} | |
public MyExtendedRedisClient(String host, int port) { | |
super(host, port); | |
} | |
public MyExtendedRedisClient(RedisURI redisURI) { | |
super(redisURI); | |
} | |
@Override | |
protected <K, V> StatefulRedisConnectionImpl<K, V> newStatefulRedisConnection(CommandHandler<K, V> commandHandler, | |
RedisCodec<K, V> codec) { | |
return new MyStatefulRedisConnectionImpl(commandHandler, codec, timeout, unit); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import static org.assertj.core.api.Assertions.assertThat; | |
import static org.assertj.core.api.Assertions.fail; | |
import java.util.concurrent.CancellationException; | |
import org.apache.log4j.Logger; | |
import org.junit.AfterClass; | |
import org.junit.BeforeClass; | |
import org.junit.Test; | |
import org.springframework.test.util.ReflectionTestUtils; | |
import com.lambdaworks.redis.FastShutdown; | |
import com.lambdaworks.redis.RedisChannelHandler; | |
import com.lambdaworks.redis.RedisConnection; | |
import com.lambdaworks.redis.TestSettings; | |
import com.lambdaworks.redis.api.async.RedisAsyncCommands; | |
import com.lambdaworks.redis.protocol.ConnectionWatchdog; | |
import com.lambdaworks.redis.pubsub.RedisPubSubAsyncCommandsImpl; | |
import com.lambdaworks.redis.pubsub.api.async.RedisPubSubAsyncCommands; | |
import io.netty.channel.Channel; | |
/** | |
* Test for override/extensability of RedisClient | |
*/ | |
public class MyExtendedRedisClientTest { | |
public static final String host = "localhost" | |
public static final int port = 6379 | |
protected static MyExtendedRedisClient client; | |
protected RedisConnection<String, String> redis; | |
protected String key = "key"; | |
protected String value = "value"; | |
@BeforeClass | |
public static void setupClient() { | |
client = new MyExtendedRedisClient(host, port); | |
} | |
@AfterClass | |
public static void shutdownClient() { | |
client.shutdown(); | |
} | |
@Test | |
public void cancelCommandsWhileDisconnected() throws Exception { | |
RedisAsyncCommands<String, String> connection = client.connect().async(); | |
connection.set("key", "value").get(); | |
RedisChannelHandler<String, String> channelHandler = (RedisChannelHandler<String, String>) connection | |
.getStatefulConnection(); | |
Channel channel = (Channel) ReflectionTestUtils.getField(channelHandler.getChannelWriter(), "channel"); | |
ConnectionWatchdog connectionWatchdog = channel.pipeline().get(ConnectionWatchdog.class); | |
connectionWatchdog.setListenOnChannelInactive(false); | |
connection.quit(); | |
while (connection.isOpen()) { | |
Thread.sleep(10); | |
} | |
try { | |
connection.get("key").get(); | |
fail("missing exception"); | |
} catch (Exception e) { | |
assertThat(e).isExactlyInstanceOf(CancellationException.class); | |
} | |
connectionWatchdog.setListenOnChannelInactive(true); | |
connectionWatchdog.scheduleReconnect(); | |
while (!connection.isOpen()) { | |
Thread.sleep(10); | |
} | |
assertThat(connection.get("key").get()).isEqualTo("value"); | |
connection.close(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.TimeUnit; | |
import com.lambdaworks.redis.RedisChannelWriter; | |
import com.lambdaworks.redis.StatefulRedisConnectionImpl; | |
import com.lambdaworks.redis.codec.RedisCodec; | |
import com.lambdaworks.redis.protocol.RedisCommand; | |
/** | |
* @author <a href="mailto:mpaluch@paluch.biz">Mark Paluch</a> | |
* @since 11.08.15 07:50 | |
*/ | |
public class MyStatefulRedisConnectionImpl<K, V> extends StatefulRedisConnectionImpl<K, V> { | |
/** | |
* Initialize a new connection. | |
* | |
* @param writer the channel writer | |
* @param codec Codec used to encode/decode keys and values. | |
* @param timeout Maximum time to wait for a response. | |
* @param unit Unit of time for the timeout. | |
*/ | |
public MyStatefulRedisConnectionImpl(RedisChannelWriter<K, V> writer, RedisCodec<K, V> codec, long timeout, TimeUnit unit) { | |
super(writer, codec, timeout, unit); | |
} | |
@Override | |
public void deactivated() { | |
super.deactivated(); | |
getChannelWriter().reset(); | |
} | |
@Override | |
public <T, C extends RedisCommand<K, V, T>> C dispatch(C cmd) { | |
if (!isOpen()) { | |
cmd.cancel(); | |
return cmd; | |
} | |
return super.dispatch(cmd); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment