Skip to content

Instantly share code, notes, and snippets.

@mp911de
Created August 11, 2015 06:11
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mp911de/407632a6b98e6ee36180 to your computer and use it in GitHub Desktop.
Save mp911de/407632a6b98e6ee36180 to your computer and use it in GitHub Desktop.
Implementing "cancel commands while disconnected" for https://github.com/mp911de/lettuce/issues/115
import javax.enterprise.inject.Alternative;
import com.lambdaworks.redis.RedisClient;
import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.StatefulRedisConnectionImpl;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandHandler;
import com.lambdaworks.redis.pubsub.PubSubCommandHandler;
import com.lambdaworks.redis.pubsub.StatefulRedisPubSubConnectionImpl;
/**
* Demo code for extending a RedisClient.
*
* @author <a href="mailto:mpaluch@paluch.biz">Mark Paluch</a>
*/
public class MyExtendedRedisClient extends RedisClient {
public MyExtendedRedisClient() {
}
public MyExtendedRedisClient(String host) {
super(host);
}
public MyExtendedRedisClient(String host, int port) {
super(host, port);
}
public MyExtendedRedisClient(RedisURI redisURI) {
super(redisURI);
}
@Override
protected <K, V> StatefulRedisConnectionImpl<K, V> newStatefulRedisConnection(CommandHandler<K, V> commandHandler,
RedisCodec<K, V> codec) {
return new MyStatefulRedisConnectionImpl(commandHandler, codec, timeout, unit);
}
}
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import java.util.concurrent.CancellationException;
import org.apache.log4j.Logger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.test.util.ReflectionTestUtils;
import com.lambdaworks.redis.FastShutdown;
import com.lambdaworks.redis.RedisChannelHandler;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.TestSettings;
import com.lambdaworks.redis.api.async.RedisAsyncCommands;
import com.lambdaworks.redis.protocol.ConnectionWatchdog;
import com.lambdaworks.redis.pubsub.RedisPubSubAsyncCommandsImpl;
import com.lambdaworks.redis.pubsub.api.async.RedisPubSubAsyncCommands;
import io.netty.channel.Channel;
/**
* Test for override/extensability of RedisClient
*/
public class MyExtendedRedisClientTest {
public static final String host = "localhost"
public static final int port = 6379
protected static MyExtendedRedisClient client;
protected RedisConnection<String, String> redis;
protected String key = "key";
protected String value = "value";
@BeforeClass
public static void setupClient() {
client = new MyExtendedRedisClient(host, port);
}
@AfterClass
public static void shutdownClient() {
client.shutdown();
}
@Test
public void cancelCommandsWhileDisconnected() throws Exception {
RedisAsyncCommands<String, String> connection = client.connect().async();
connection.set("key", "value").get();
RedisChannelHandler<String, String> channelHandler = (RedisChannelHandler<String, String>) connection
.getStatefulConnection();
Channel channel = (Channel) ReflectionTestUtils.getField(channelHandler.getChannelWriter(), "channel");
ConnectionWatchdog connectionWatchdog = channel.pipeline().get(ConnectionWatchdog.class);
connectionWatchdog.setListenOnChannelInactive(false);
connection.quit();
while (connection.isOpen()) {
Thread.sleep(10);
}
try {
connection.get("key").get();
fail("missing exception");
} catch (Exception e) {
assertThat(e).isExactlyInstanceOf(CancellationException.class);
}
connectionWatchdog.setListenOnChannelInactive(true);
connectionWatchdog.scheduleReconnect();
while (!connection.isOpen()) {
Thread.sleep(10);
}
assertThat(connection.get("key").get()).isEqualTo("value");
connection.close();
}
}
import java.util.concurrent.TimeUnit;
import com.lambdaworks.redis.RedisChannelWriter;
import com.lambdaworks.redis.StatefulRedisConnectionImpl;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.RedisCommand;
/**
* @author <a href="mailto:mpaluch@paluch.biz">Mark Paluch</a>
* @since 11.08.15 07:50
*/
public class MyStatefulRedisConnectionImpl<K, V> extends StatefulRedisConnectionImpl<K, V> {
/**
* Initialize a new connection.
*
* @param writer the channel writer
* @param codec Codec used to encode/decode keys and values.
* @param timeout Maximum time to wait for a response.
* @param unit Unit of time for the timeout.
*/
public MyStatefulRedisConnectionImpl(RedisChannelWriter<K, V> writer, RedisCodec<K, V> codec, long timeout, TimeUnit unit) {
super(writer, codec, timeout, unit);
}
@Override
public void deactivated() {
super.deactivated();
getChannelWriter().reset();
}
@Override
public <T, C extends RedisCommand<K, V, T>> C dispatch(C cmd) {
if (!isOpen()) {
cmd.cancel();
return cmd;
}
return super.dispatch(cmd);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment