-
-
Save HaloFour/2a0e18246b1ab6bb82b199f1d6eb4aab to your computer and use it in GitHub Desktop.
RedisClusterClient with Node Rediscovery
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private static class DiscoverableClusterClient extends RedisClusterClient { | |
private static final Method REMOTE_METHOD; | |
private static final Timer timer = new Timer("lettuce-cluster-rediscovery", true); | |
private static final Delay delay = Delay.exponential(1, 60, TimeUnit.SECONDS, 2); | |
static { | |
try { | |
REMOTE_METHOD = CommandHandler.class.getDeclaredMethod("remote"); | |
} | |
catch (NoSuchMethodException exception) { | |
LOG.error("Failed to look up method CommandHandler#remote()", exception); | |
throw new RuntimeException(exception); | |
} | |
REMOTE_METHOD.setAccessible(true); | |
} | |
private static SocketAddress getConnectionAddress(RedisChannelHandler<?, ?> connection) { | |
RedisChannelWriter<?, ?> channelWriter = connection.getChannelWriter(); | |
if (channelWriter instanceof CommandHandler) { | |
try { | |
return (SocketAddress)REMOTE_METHOD.invoke(channelWriter); | |
} | |
catch (IllegalAccessException | InvocationTargetException exception) { | |
LOG.warn("Failed to read remote socket address of Redis cluster node.", exception); | |
} | |
} | |
return null; | |
} | |
private final Iterable<RedisURI> nodes; | |
private volatile boolean connected = false; | |
private volatile boolean closed = false; | |
private final AtomicInteger nodeCount = new AtomicInteger(0); | |
private final AtomicBoolean triggerRefresh = new AtomicBoolean(false); | |
public DiscoverableClusterClient(ClientResources clientResources, NodeDiscoveryStrategy nodeDiscoveryStrategy, boolean ssl, int timeout) { | |
this(clientResources, convertHostsToRedisURIs(nodeDiscoveryStrategy, ssl, timeout)); | |
} | |
private DiscoverableClusterClient(ClientResources clientResources, Iterable<RedisURI> nodes) { | |
super(clientResources, nodes); | |
this.nodes = nodes; | |
this.addListener(new RedisConnectionStateListener() { | |
@Override | |
public void onRedisDisconnected(RedisChannelHandler<?, ?> connection) { | |
if (connection instanceof StatefulRedisClusterConnection) { | |
LOG.warn("Connection to Redis cluster lost."); | |
connected = false; | |
triggerRefresh.set(true); | |
} | |
else if (connection instanceof StatefulRedisConnection) { | |
SocketAddress remote = getConnectionAddress(connection); | |
if (remote != null) { | |
LOG.warn("Disconnected from Redis cluster node {}", remote); | |
} | |
if (nodeCount.decrementAndGet() == 0) { | |
scheduleTopologyRefresh(1); | |
} | |
} | |
} | |
@Override | |
public void onRedisConnected(RedisChannelHandler<?, ?> connection) { | |
if (connection instanceof StatefulRedisClusterConnection) { | |
connected = true; | |
String nodes = getPartitions() | |
.getPartitions() | |
.stream() | |
.map(Object::toString) | |
.collect(Collectors.joining(", ")); | |
LOG.info("Connected to Redis cluster: [{}]", nodes); | |
triggerRefresh.set(false); | |
} | |
else if (connection instanceof StatefulRedisConnection) { | |
SocketAddress remote = getConnectionAddress(connection); | |
if (remote != null) { | |
LOG.info("Connected to Redis cluster node {}", remote); | |
} | |
nodeCount.incrementAndGet(); | |
} | |
} | |
@Override | |
public void onRedisExceptionCaught(RedisChannelHandler<?, ?> connection, Throwable cause) { | |
LOG.error("Caught Redis exception: ", cause); | |
} | |
}); | |
} | |
private void scheduleTopologyRefresh(int attempt) { | |
if (closed) { | |
return; | |
} | |
timer.schedule(new TimerTask() { | |
@Override | |
public void run() { | |
if (connected) { | |
return; | |
} | |
try { | |
reloadPartitions(); | |
} | |
catch (Throwable exception) { | |
LOG.error("Failed to rediscover Redis cluster nodes.", exception); | |
} | |
scheduleTopologyRefresh(attempt + 1); | |
} | |
}, delay.createDelay(attempt)); | |
} | |
@Override | |
protected Iterable<RedisURI> getTopologyRefreshSource() { | |
return this.nodes; | |
} | |
@Override | |
public void shutdown() { | |
closed = true; | |
super.shutdown(); | |
} | |
@Override | |
public void shutdown(long quietPeriod, long timeout, TimeUnit timeUnit) { | |
closed = true; | |
super.shutdown(quietPeriod, timeout, timeUnit); | |
} | |
private static Iterable<RedisURI> convertHostsToRedisURIs(NodeDiscoveryStrategy discovery, boolean ssl, int timeout) { | |
return () -> { | |
List<RedisURI> uris = discovery.getNodes() | |
.stream() | |
.map(host -> RedisURI.builder() | |
.withHost(host.getHost()) | |
.withPort(host.getPort()) | |
.withTimeout(timeout, TimeUnit.MILLISECONDS) | |
.withSsl(ssl) | |
.withVerifyPeer(false) | |
.build() | |
) | |
.map(uri -> { | |
LOG.info("Discovered {}", uri); | |
return uri; | |
}) | |
.collect(Collectors.toList()); | |
return uris.iterator(); | |
}; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment