Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@wkgcass
Last active April 23, 2019 14:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wkgcass/7df4f61e68ec0e46d47c5f87fccc53ff to your computer and use it in GitHub Desktop.
Save wkgcass/7df4f61e68ec0e46d47c5f87fccc53ff to your computer and use it in GitHub Desktop.
vertx redis 3.5.3 failed with redis-sentinel
package io.vertx.redis.impl;
/*
* fix that vertx redis 3.5.3 cannot connect to redis sentinel if password is present.
* what has been changed is between // --------------------START and // --------------------END
*/
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.redis.RedisOptions;
import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
abstract class BaseRedisClient<C> {
final EventBus eb;
final RedisSubscriptions subscriptions;
final String encoding;
final Charset charset;
final Charset binaryCharset;
final String baseAddress;
// we need 2 connections, one for normal commands and a second in case we do pub/sub
final RedisConnection redis;
final RedisConnection pubsub;
BaseRedisClient(Vertx vertx, RedisOptions config) {
this.eb = vertx.eventBus();
this.encoding = config.getEncoding();
this.charset = Charset.forName(encoding);
this.binaryCharset = Charset.forName("iso-8859-1");
this.baseAddress = config.getAddress();
subscriptions = new RedisSubscriptions(vertx);
// --------------------START
boolean isSentinel = this instanceof AbstractRedisSentinelClient;
redis = new RedisConnection(vertx, config, null, isSentinel);
pubsub = new RedisConnection(vertx, config, subscriptions, isSentinel);
// --------------------END
}
public synchronized void close(Handler<AsyncResult<Void>> handler) {
// this is a special case it should sent the message QUIT and then close the sockets
final AtomicInteger cnt = new AtomicInteger(0);
final Handler<AsyncResult<Void>> cb = v -> {
if (cnt.incrementAndGet() == 2) {
handler.handle(Future.succeededFuture());
}
};
redis.disconnect(cb);
pubsub.disconnect(cb);
}
final void sendString(final C command, final List<?> args, final Handler<AsyncResult<String>> resultHandler) {
send(command, args, String.class, false, resultHandler);
}
final void sendLong(final C command, final List<?> args, final Handler<AsyncResult<Long>> resultHandler) {
send(command, args, Long.class, false, resultHandler);
}
final void sendVoid(final C command, final List<?> args, final Handler<AsyncResult<Void>> resultHandler) {
send(command, args, Void.class, false, resultHandler);
}
final void sendJsonArray(final C command, final List<?> args, final Handler<AsyncResult<JsonArray>> resultHandler) {
send(command, args, JsonArray.class, false, resultHandler);
}
final void sendJsonObject(final C command, final List<?> args, final Handler<AsyncResult<JsonObject>> resultHandler) {
send(command, args, JsonObject.class, false, resultHandler);
}
abstract <R> void send(final C command, final List<?> redisArgs, final Class<R> returnType,
final boolean binary,
final Handler<AsyncResult<R>> resultHandler);
}
/**
* Copyright 2015 Red Hat, Inc.
* <p>
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
* <p>
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* <p>
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
* <p>
* You may elect to redistribute this code under either of these licenses.
*/
package io.vertx.redis.impl;
/*
* fix that vertx redis 3.5.3 cannot connect to redis sentinel if password is present.
* what has been changed is between // --------------------START and // --------------------END
*/
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
import io.vertx.redis.RedisOptions;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;
/**
* Base class for Redis Vert.x client. Generated client would use the facilities
* in this class to implement typed commands.
*/
class RedisConnection {
private static final Logger log = LoggerFactory.getLogger(RedisConnection.class);
private final Vertx vertx;
private final Context context;
/**
* there are 2 queues, one for commands not yet sent over the wire to redis and another for commands already sent to
* redis. At start up it expected that until the connection handshake is complete the pending queue will grow and once
* the handshake completes it will be empty while the second one will be in constant movement.
* <p>
* Since the client works **ALWAYS** in pipeline mode the order of adding and removing elements to the queues is
* crucial. A command is sent only when its reply handler or handlers are added to any of the queues and the command
* is send to the wire.
* <p>
* For this reason we must **ALWAYS** synchronize the access to the queues and writes to the socket.
*/
// pending: commands that have not yet been sent to the server
private final Queue<Command<?>> pending = new LinkedList<>();
// waiting: commands that have been sent but not answered
private final Queue<Command<?>> waiting = new LinkedList<>();
private final ReplyParser replyParser;
private final RedisSubscriptions subscriptions;
private final RedisOptions config;
private final AtomicReference<State> state = new AtomicReference<>(State.DISCONNECTED);
// attempt to reconnect on error, by default true
private volatile boolean reconnect = true;
private volatile NetSocket netSocket;
// --------------------START
private final boolean isSentinel;
// --------------------END
/**
* Create a RedisConnection.
*/
public RedisConnection(Vertx vertx, RedisOptions config, RedisSubscriptions subscriptions,
// --------------------START
boolean isSentinel
// --------------------END
) {
// --------------------START
this.isSentinel = isSentinel;
// --------------------END
// Make sure we have an event loop context for serializability of the commands
Context ctx = Vertx.currentContext();
if (ctx == null) {
ctx = vertx.getOrCreateContext();
} else if (!ctx.isEventLoopContext()) {
VertxInternal vi = (VertxInternal) vertx;
ctx = vi.createEventLoopContext(null, null, new JsonObject(), Thread.currentThread().getContextClassLoader());
}
this.vertx = vertx;
this.context = ctx;
this.config = config;
this.subscriptions = subscriptions;
if (subscriptions != null) {
this.replyParser = new ReplyParser(reply -> {
// Pub/sub messages are always multi-bulk
if (reply.is('*')) {
Reply[] data = (Reply[]) reply.data();
if (data != null) {
// message
if (data.length == 3) {
if (data[0].is('$') && "message".equals(data[0].asType(String.class))) {
String channel = data[1].asType(String.class);
subscriptions.handleChannel(channel, data);
return;
}
}
// pmessage
else if (data.length == 4) {
if (data[0].is('$') && "pmessage".equals(data[0].asType(String.class))) {
String pattern = data[1].asType(String.class);
subscriptions.handlePattern(pattern, data);
return;
}
}
}
}
// fallback to normal handler
handleReply(reply);
});
} else {
this.replyParser = new ReplyParser(this::handleReply);
}
}
private boolean useSentinel() {
// in case the user has disconnected before, update the state
reconnect = true;
return config.getSentinels() != null && config.getSentinels().size() > 0 && config.getMasterName() != null;
}
private void connect(SocketAddress socketAddress, boolean checkMaster) {
replyParser.reset();
// create a netClient for the connection
final NetClient client = vertx.createNetClient(config);
client.connect(socketAddress, asyncResult -> {
if (asyncResult.failed()) {
client.close();
if (state.compareAndSet(State.CONNECTING, State.ERROR)) {
// clean up any waiting command
clearQueue(waiting, asyncResult.cause());
// clean up any pending command
clearQueue(pending, asyncResult.cause());
state.set(State.DISCONNECTED);
// Should we retry?
if (reconnect) {
vertx.setTimer(config.getReconnectInterval(), v0 -> connect());
}
}
} else {
netSocket = asyncResult.result()
.handler(replyParser)
.closeHandler(v2 -> {
state.set(State.ERROR);
// clean up any waiting command
clearQueue(waiting, "Connection closed");
// clean up any pending command
clearQueue(pending, "Connection closed");
state.set(State.DISCONNECTED);
client.close();
// was this close intentional?
if (reconnect) {
vertx.setTimer(config.getReconnectInterval(), v0 -> connect());
}
})
.exceptionHandler(e ->
netSocket.close());
// clean up any waiting command
clearQueue(waiting, "Connection lost");
// handle the connection handshake
doAuth();
// check if the Redis instance is master
if (checkMaster) {
doCheckMaster();
}
}
});
}
private void connect() {
if (state.compareAndSet(State.DISCONNECTED, State.CONNECTING)) {
runOnContext(v -> {
if (useSentinel()) {
RedisMasterResolver resolver = new RedisMasterResolver(context.owner(), config);
resolver.getMasterAddressByName(jsonObjectAsyncResult -> {
if (jsonObjectAsyncResult.succeeded()) {
JsonObject masterAddress = jsonObjectAsyncResult.result();
connect(SocketAddress.inetSocketAddress(masterAddress.getInteger("port"), masterAddress.getString("host")), true);
} else {
// clean up any waiting command
clearQueue(waiting, jsonObjectAsyncResult.cause());
// clean up any pending command
clearQueue(pending, jsonObjectAsyncResult.cause());
state.set(State.DISCONNECTED);
}
resolver.close();
});
} else {
// if the domain socket option is enabled, use the domain socket address to connect to redis server
SocketAddress socketAddress;
if (config.isDomainSocket()) {
socketAddress = SocketAddress.domainSocketAddress(config.getDomainSocketAddress());
} else {
socketAddress = SocketAddress.inetSocketAddress(config.getPort(), config.getHost());
}
connect(socketAddress, false);
}
});
}
}
void disconnect(Handler<AsyncResult<Void>> closeHandler) {
// update state to notify that the user wants to disconnect
reconnect = false;
switch (state.get()) {
case CONNECTING:
// eventually will become connected
case CONNECTED:
final Command<Void> cmd = new Command<>(context, RedisCommand.QUIT, null, Charset.defaultCharset(), ResponseTransform.NONE, Void.class);
cmd.handler(ar -> {
// at this we force the state to error so any incoming command will not start a connection
if (state.compareAndSet(State.CONNECTED, State.ERROR)) {
// clean up any waiting command
clearQueue(waiting, "Connection closed");
// clean up any pending command
clearQueue(pending, "Connection closed");
netSocket.close();
}
closeHandler.handle(Future.succeededFuture());
});
send(cmd);
break;
case ERROR:
// eventually will become DISCONNECTED
case DISCONNECTED:
closeHandler.handle(Future.succeededFuture());
break;
}
}
/**
* Sends a message to redis, if the connection is not active then the command is queued for processing and the
* procedure to start a connection is started.
* <p>
* While this procedure is going on (CONNECTING) incomming commands are queued.
*
* @param command the redis command to send
*/
void send(final Command<?> command) {
// start the handshake if not connected
if (state.get() == State.DISCONNECTED) {
connect();
}
// write to the socket in the netSocket context
runOnContext(v -> {
switch (state.get()) {
case CONNECTED:
write(command);
break;
case CONNECTING:
case ERROR:
case DISCONNECTED:
pending.add(command);
break;
}
});
}
/**
* Write the command to the socket. The order read must match the order written, vertx
* guarantees that this is only called from a single thread.
*/
private void write(Command<?> command) {
for (int i = 0; i < command.getExpectedReplies(); ++i) {
waiting.add(command);
}
command.writeTo(netSocket);
}
/**
* Once a socket connection is established one needs to authenticate if there is a password
*/
private void doAuth() {
if (config.getAuth() != null
// --------------------START
&& !isSentinel
// --------------------END
) {
// we need to authenticate first
final List<Object> args = new ArrayList<>();
args.add(config.getAuth());
Command<String> authCmd = new Command<>(context, RedisCommand.AUTH, args, Charset.forName(config.getEncoding()), ResponseTransform.NONE, String.class).handler(auth -> {
if (auth.failed()) {
// clean up any waiting command
clearQueue(pending, auth.cause());
netSocket.close();
} else {
// auth success, proceed with select
doSelect();
}
});
// write to the socket in the netSocket context
// queue it
write(authCmd);
} else {
// --------------------START
if (isSentinel) {
System.out.println("If you see this print, it means that my patch works." +
"The program used to have a bug,the connection to sentinel would send an AUTH cmd and would be rejected" +
"This patch only will work in version 3.5.3, new version might have solved the problem but i'm not sure. Need further tests.");
}
// --------------------END
// no auth, proceed with select
doSelect();
}
}
private void doCheckMaster() {
Command<JsonObject> infoCommand = new Command<>(context, RedisCommand.INFO, Collections.emptyList(), Charset.forName(config.getEncoding()), ResponseTransform.INFO, JsonObject.class).handler(info -> {
if (info.failed()) {
// clean up any waiting command
clearQueue(pending, info.cause());
netSocket.close();
} else {
if (!"master".equals(info.result().getJsonObject("replication").getString("role"))) {
clearQueue(pending, info.cause());
netSocket.close();
log.error("Forced disconnect of non-master");
} else {
log.info("Verification of master role succeeded");
}
}
});
// write to the socket in the netSocket context
// queue it
write(infoCommand);
}
private void doSelect() {
// optionally there could be a select command
if (config.getSelect() != null) {
final List<Object> args = new ArrayList<>();
args.add(config.getSelect());
Command<String> selectCmd = new Command<>(context, RedisCommand.SELECT, args, Charset.forName(config.getEncoding()), ResponseTransform.NONE, String.class).handler(select -> {
if (select.failed()) {
// clean up any waiting command
clearQueue(pending, select.cause());
netSocket.close();
} else {
// select success, proceed with resendof pending messages/resubscribe pub/sub
restoreState();
}
});
// write to the socket in the netSocket context
// queue it
write(selectCmd);
} else {
// no select, proceed with resendof pending messages/resubscribe pub/sub
restoreState();
}
}
private void restoreState() {
Command<?> command;
if (state.compareAndSet(State.CONNECTING, State.CONNECTED)) {
// we are connected so clean up the pending queue
while ((command = pending.poll()) != null) {
write(command);
}
// restore the pub/sub subscriptions
if (subscriptions != null) {
for (String channel : subscriptions.channelNames()) {
final List<Object> args = new ArrayList<>();
args.add(channel);
write(new Command<>(context, RedisCommand.SUBSCRIBE, args, Charset.forName(config.getEncoding()), ResponseTransform.NONE, JsonArray.class));
}
for (String pattern : subscriptions.patternNames()) {
final List<Object> args = new ArrayList<>();
args.add(pattern);
write(new Command<>(context, RedisCommand.PSUBSCRIBE, args, Charset.forName(config.getEncoding()), ResponseTransform.NONE, JsonArray.class));
}
}
}
}
@SuppressWarnings("unchecked")
private void handleReply(Reply reply) {
final Command cmd = waiting.poll();
if (cmd != null) {
switch (reply.type()) {
case '-': // Error
cmd.handle(Future.failedFuture(reply.asType(String.class)));
return;
case '+': // Status
switch (cmd.responseTransform()) {
case ARRAY:
cmd.handle(Future.succeededFuture(new JsonArray().add(reply.asType(String.class))));
break;
default:
cmd.handle(Future.succeededFuture(reply.asType(cmd.returnType())));
break;
}
return;
case '$': // Bulk
switch (cmd.responseTransform()) {
case ARRAY:
cmd.handle(Future.succeededFuture(new JsonArray().add(reply.asType(String.class, cmd.encoding()))));
break;
case INFO:
String info = reply.asType(String.class, cmd.encoding());
if (info == null) {
cmd.handle(Future.succeededFuture(null));
} else {
String lines[] = info.split("\\r?\\n");
JsonObject value = new JsonObject();
JsonObject section = null;
for (String line : lines) {
if (line.length() == 0) {
// end of section
section = null;
continue;
}
if (line.charAt(0) == '#') {
// begin section
section = new JsonObject();
// create a sub key with the section name
value.put(line.substring(2).toLowerCase(), section);
} else {
// entry in section
int split = line.indexOf(':');
if (section == null) {
value.put(line.substring(0, split), line.substring(split + 1));
} else {
section.put(line.substring(0, split), line.substring(split + 1));
}
}
}
cmd.handle(Future.succeededFuture(value));
}
break;
default:
cmd.handle(Future.succeededFuture(reply.asType(cmd.returnType(), cmd.encoding())));
break;
}
return;
case '*': // Multi
switch (cmd.responseTransform()) {
case HASH:
cmd.handle(Future.succeededFuture(reply.asType(JsonObject.class, cmd.encoding())));
break;
default:
cmd.handle(Future.succeededFuture(reply.asType(JsonArray.class, cmd.encoding())));
break;
}
return;
case ':': // Integer
switch (cmd.responseTransform()) {
case ARRAY:
cmd.handle(Future.succeededFuture(new JsonArray().add(reply.asType(Long.class))));
break;
default:
cmd.handle(Future.succeededFuture(reply.asType(cmd.returnType())));
break;
}
return;
default:
cmd.handle(Future.failedFuture("Unknown message type"));
}
} else {
log.error("No handler waiting for message: " + reply.asType(String.class));
}
}
private void runOnContext(Handler<Void> handler) {
// Use only if it's the same context and we are on the event loop thread
if (Vertx.currentContext() == context && Context.isOnEventLoopThread()) {
handler.handle(null);
} else {
context.runOnContext(handler);
}
}
private void clearQueue(Queue<Command<?>> q, String message) {
Command<?> cmd;
// clean up any pending command
while ((cmd = q.poll()) != null) {
cmd.handle(Future.failedFuture(message));
}
}
private void clearQueue(Queue<Command<?>> q, Throwable cause) {
Command<?> cmd;
// clean up any pending command
while ((cmd = q.poll()) != null) {
cmd.handle(Future.failedFuture(cause));
}
}
private enum State {
/**
* The connection is not active. The is a stop state.
*/
DISCONNECTED,
/**
* The connection is in transit, from here it can become connected or and error can occur.
*/
CONNECTING,
/**
* Connection is active from here it can become an error or disconnected.
*/
CONNECTED,
/**
* Connection problem
*/
ERROR
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment