Skip to content

Instantly share code, notes, and snippets.

@HaloFour
Created December 6, 2016 18:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save HaloFour/2a0e18246b1ab6bb82b199f1d6eb4aab to your computer and use it in GitHub Desktop.
Save HaloFour/2a0e18246b1ab6bb82b199f1d6eb4aab to your computer and use it in GitHub Desktop.
RedisClusterClient with Node Rediscovery
private static class DiscoverableClusterClient extends RedisClusterClient {
private static final Method REMOTE_METHOD;
private static final Timer timer = new Timer("lettuce-cluster-rediscovery", true);
private static final Delay delay = Delay.exponential(1, 60, TimeUnit.SECONDS, 2);
static {
try {
REMOTE_METHOD = CommandHandler.class.getDeclaredMethod("remote");
}
catch (NoSuchMethodException exception) {
LOG.error("Failed to look up method CommandHandler#remote()", exception);
throw new RuntimeException(exception);
}
REMOTE_METHOD.setAccessible(true);
}
private static SocketAddress getConnectionAddress(RedisChannelHandler<?, ?> connection) {
RedisChannelWriter<?, ?> channelWriter = connection.getChannelWriter();
if (channelWriter instanceof CommandHandler) {
try {
return (SocketAddress)REMOTE_METHOD.invoke(channelWriter);
}
catch (IllegalAccessException | InvocationTargetException exception) {
LOG.warn("Failed to read remote socket address of Redis cluster node.", exception);
}
}
return null;
}
private final Iterable<RedisURI> nodes;
private volatile boolean connected = false;
private volatile boolean closed = false;
private final AtomicInteger nodeCount = new AtomicInteger(0);
private final AtomicBoolean triggerRefresh = new AtomicBoolean(false);
public DiscoverableClusterClient(ClientResources clientResources, NodeDiscoveryStrategy nodeDiscoveryStrategy, boolean ssl, int timeout) {
this(clientResources, convertHostsToRedisURIs(nodeDiscoveryStrategy, ssl, timeout));
}
private DiscoverableClusterClient(ClientResources clientResources, Iterable<RedisURI> nodes) {
super(clientResources, nodes);
this.nodes = nodes;
this.addListener(new RedisConnectionStateListener() {
@Override
public void onRedisDisconnected(RedisChannelHandler<?, ?> connection) {
if (connection instanceof StatefulRedisClusterConnection) {
LOG.warn("Connection to Redis cluster lost.");
connected = false;
triggerRefresh.set(true);
}
else if (connection instanceof StatefulRedisConnection) {
SocketAddress remote = getConnectionAddress(connection);
if (remote != null) {
LOG.warn("Disconnected from Redis cluster node {}", remote);
}
if (nodeCount.decrementAndGet() == 0) {
scheduleTopologyRefresh(1);
}
}
}
@Override
public void onRedisConnected(RedisChannelHandler<?, ?> connection) {
if (connection instanceof StatefulRedisClusterConnection) {
connected = true;
String nodes = getPartitions()
.getPartitions()
.stream()
.map(Object::toString)
.collect(Collectors.joining(", "));
LOG.info("Connected to Redis cluster: [{}]", nodes);
triggerRefresh.set(false);
}
else if (connection instanceof StatefulRedisConnection) {
SocketAddress remote = getConnectionAddress(connection);
if (remote != null) {
LOG.info("Connected to Redis cluster node {}", remote);
}
nodeCount.incrementAndGet();
}
}
@Override
public void onRedisExceptionCaught(RedisChannelHandler<?, ?> connection, Throwable cause) {
LOG.error("Caught Redis exception: ", cause);
}
});
}
private void scheduleTopologyRefresh(int attempt) {
if (closed) {
return;
}
timer.schedule(new TimerTask() {
@Override
public void run() {
if (connected) {
return;
}
try {
reloadPartitions();
}
catch (Throwable exception) {
LOG.error("Failed to rediscover Redis cluster nodes.", exception);
}
scheduleTopologyRefresh(attempt + 1);
}
}, delay.createDelay(attempt));
}
@Override
protected Iterable<RedisURI> getTopologyRefreshSource() {
return this.nodes;
}
@Override
public void shutdown() {
closed = true;
super.shutdown();
}
@Override
public void shutdown(long quietPeriod, long timeout, TimeUnit timeUnit) {
closed = true;
super.shutdown(quietPeriod, timeout, timeUnit);
}
private static Iterable<RedisURI> convertHostsToRedisURIs(NodeDiscoveryStrategy discovery, boolean ssl, int timeout) {
return () -> {
List<RedisURI> uris = discovery.getNodes()
.stream()
.map(host -> RedisURI.builder()
.withHost(host.getHost())
.withPort(host.getPort())
.withTimeout(timeout, TimeUnit.MILLISECONDS)
.withSsl(ssl)
.withVerifyPeer(false)
.build()
)
.map(uri -> {
LOG.info("Discovered {}", uri);
return uri;
})
.collect(Collectors.toList());
return uris.iterator();
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment