Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save alphazero/480022 to your computer and use it in GitHub Desktop.
Save alphazero/480022 to your computer and use it in GitHub Desktop.
/*
* Copyright 2010 Joubin Houshyar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package test.net;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.ConnectException;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.Random;
/**
* Demo a fault-tolerant connection.
* Both dropped connections or server downtime are handled.
* <p>
* To run the test, set in your redis.conf:
* <p>
* set the appendonly so that when we crash redis,
* the counter values are correctly reflected in the db file.
* <pre><code>appendonly yes</code></pre>
* <p>
* set the timeout to 1 sec to force frequent idle client
* timeouts.
* <pre><code>timeout 1</code></pre>
*
* @author joubin (alphazero@sensesay.net)
* @date Jul 17, 2010
*
*/
public class FaultTolerantConnector {
/** Run the test */
public static void main (String[] args) {
try {
new FaultTolerantConnector().run();
}
catch (ConnectException e) {
System.err.format("Failed to connect to server: %s\n", e);
e.printStackTrace();
}
catch (Throwable e) {
System.err.format("Fault: %s\n", e);
e.printStackTrace();
}
}
// ========================================================================
String password = "jredis"; // your server's password. null will skip AUTH
Socket s = null;
DataOutputStream dout = null;
BufferedReader din = null;
long lastchecktime;
boolean isConnected = false;
/**
* Perpetually runs a INCR sequence. Delay between requests is chosen to be
* much bigger than the redis.conf:timeout settings. (Tested with 1 sec).
*
* @throws Exception
*/
public void run() throws Exception {
long maxDowntime = 1000 * 60;
long delay = 5000; // redis.conf::timeout=1 // 1 second
long redismaxidle = 1000;
connect();
long n = 0;
long reps = 10000;
long start = System.currentTimeMillis();
String guidKey = String.format("test:%X", new Random(System.currentTimeMillis()).nextLong());
System.out.format("INCR key: %s\n", guidKey);
DEL(guidKey);
long cntr = 0;
for(;;){
// make sure connection is alive
// tolerate up to maxDowntime msecs while Redis comes back up
//
if(System.currentTimeMillis() - lastchecktime > redismaxidle)
assertConnectionEstablished(maxDowntime, false);
// do something destructive
//
try {
long rcntr = INCR(guidKey);
cntr++;
// we'll check our cntr against redis rcntr to see if the faults are introducing errors or not
if(cntr != -1 && rcntr != cntr )
throw new IllegalStateException(String.format("Inconsistent state: cntr @ %d and the INCR response of %d", cntr, rcntr));
n++;
}
catch (FaultOnIO e){
if(e.op == FaultOnIO.Op.W) {
// can happen if Redis shuts down
// just when connector is about to send the request
System.err.format("WARNING: last Op's request raised a connection fault -- assuming Op NOT processed.\n");
}
else {
// can happen if Redis shuts down
// just when its about to send the response
System.err.format("WARNING: last Op's response raised a connection fault -- ");
if(e.getCause() == null) {
System.err.format("assuming Op NOT processed.\n");
}
else {
System.err.format("assuming Op processed.\n");
// increment counter to test our assumption
cntr ++;
}
}
lastchecktime = 0;
continue;
}
if(n > reps) {
long delta = System.currentTimeMillis() - start;
System.out.format("%d INCRs in %d msecs\n", reps, delta);
// delay to force idle client disconnect
// (if delay >> redis.conf::timeout)
takeANap (delay);
start = System.currentTimeMillis();
n = 0;
}
}
}
private final void assertConnectionEstablished (long maxDowntime, boolean usePings) throws Exception{
long start = System.currentTimeMillis();
for(;;) {
try {
if(usePings)
testConnectionWithPing();
else
testConnection();
lastchecktime = System.currentTimeMillis();
break;
}
catch (Exception e){
isConnected = false;
long downtime = System.currentTimeMillis() - start;
if(downtime > maxDowntime){
String errmsg = String.format("Could not re-establish connection -- server down more than %d msecs.", maxDowntime);
throw new Exception (errmsg);
}
}
}
}
private final void takeANap (long delay) {
try {
// System.out.format("will sleep for %d msecs -- Shut down that Redis now\n", delay);
Thread.sleep(delay);
}
catch (Exception e) { }
}
/**
* Works fine if used in conjunction with a timestamp that keeps track of the
* last assert time. If only invoked when the last assert time is greater than
* the redis client idel time setting (redis.conf::timeout) the RTT of this op's
* is absorbed by all the request made during that idle time window. A balanced
* setting of redis.conf timeout parameter would extend the number of these
* requests to the point where the cost is effectively negligable.
*
* @throws Exception
*/
public void testConnection() throws Exception{
try {
s.setSoTimeout(1);
int r = din.read();
if(r ==-1) {
if(isConnected){
System.err.format("WARNING: Connection has been dropped\n");
isConnected = false;
}
else {
System.err.format(".");
}
try {
connect();
System.err.format("\n");
}
catch (Exception e){
// System.out.format("ERROR: Failed to reconnect -- assuming net/server failure.\n");
throw e;
}
}
}
catch (SocketTimeoutException e) {/* good news - will happen if connection is alive */ }
catch (Exception e) {
// System.out.format("ERROR: Failed to test connection -- assuming net/server failure.\n");
throw e;
}
finally { s.setSoTimeout(0); }
}
/**
* Works but not entirely reliably. We're paying PING/PONG round trip cost
* (~100 microsecs) per request. Much better than the other method but it
* seems to still miss dropped connections at times. (Not sure why).
*
* @throws Exception
*/
public void testConnectionWithPing() throws Exception{
try {
ping();
}
catch (Exception e) {
System.out.format("WARNING: Connection has been dropped\n");
try {
connect();
}
catch (Exception e2){
System.err.format("ERROR: failed to reconnect; %s\n", e2.getMessage());
throw new Exception ("Cannot connect to Redis server", e2);
}
}
}
public void connect() throws RuntimeException, UnknownHostException, Exception{
s= new Socket("localhost", 6379);
s.setSoTimeout(0);
dout = new DataOutputStream(s.getOutputStream());
din = new BufferedReader(new InputStreamReader(s.getInputStream()));
if(password != null)
AUTH (password);
SELECT (13);
isConnected = true;
System.out.format("INFO: Connection established.\n");
}
// ------ client API
// a simple redis client impl.
//
void request(byte[] bytes) throws FaultOnIO {
try {
dout.write(bytes);
}
catch (Throwable t){
throw new FaultOnIO(FaultOnIO.Op.W, t);
}
}
String getLineResponse() throws FaultOnIO {
String resp = null;
try {
resp = din.readLine();
}
catch (Throwable t){
throw new FaultOnIO(FaultOnIO.Op.R, t);
}
finally {
if(resp == null) throw new FaultOnIO(FaultOnIO.Op.R);
}
return resp;
}
public void ping () throws Exception {
request ("PING\r\n".getBytes());
@SuppressWarnings("unused")
String resp = getLineResponse();
}
public long INCR (String key) throws Exception {
request (String.format("INCR %s\r\n", key).getBytes());
String resp = getLineResponse();
return Long.parseLong(resp.substring(1));
}
public long DEL (String key) throws Exception {
request (String.format("DEL %s\r\n", key).getBytes());
String resp = getLineResponse();
return Long.parseLong(resp.substring(1));
}
public void AUTH (String password) throws Exception {
request(String.format("AUTH %s\r\n", password).getBytes());
@SuppressWarnings("unused")
String resp = getLineResponse();
}
public void SELECT (int n) throws Exception {
request(String.format("SELECT %d\r\n", n).getBytes());
@SuppressWarnings("unused")
String resp = getLineResponse();
}
public void FLUSH () throws Exception {
request(String.format("FLUSHDB\r\n").getBytes());
@SuppressWarnings("unused")
String resp = getLineResponse();
}
@SuppressWarnings("serial")
static class FaultOnIO extends Exception {
public enum Op { R, W};
final public Op op;
public FaultOnIO (Op op) {super(); this.op = op;}
public FaultOnIO (Op op, Throwable cause) {super(cause); this.op = op;}
}
}
@alphazero
Copy link
Author

Added a simple integrity check for the destructive INCR op. Its far more likely to induce a broken pipe on write if the "delay" is above program is set to 0. You should see a few FaultOnIO thrown if you keep pulling the plug on Redis. On redis restart the counter should be at the expected value, confirming the integrity of the fault handling.

@izaquetao
Copy link

NOT FOUND

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment