Skip to content

Instantly share code, notes, and snippets.

@Randgalt
Created September 11, 2012 20:30
Show Gist options
  • Save Randgalt/3701773 to your computer and use it in GitHub Desktop.
Save Randgalt/3701773 to your computer and use it in GitHub Desktop.
FailoverTests
import com.google.common.io.Closeables;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
import com.netflix.curator.framework.api.BackgroundCallback;
import com.netflix.curator.framework.api.CuratorEvent;
import com.netflix.curator.retry.ExponentialBackoffRetry;
import com.netflix.curator.test.InstanceSpec;
import com.netflix.curator.test.TestingCluster;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.Date;
import java.util.concurrent.atomic.AtomicBoolean;
// contributed by narayanan-arunachalam: https://github.com/Netflix/curator/issues/163
public class FailoverTests
{
private TestingCluster cluster;
private class Count
{
int value;
}
@BeforeMethod
public void setup() throws Exception
{
cluster = new TestingCluster(3);
cluster.start();
}
@AfterMethod
public void tearDown()
{
Closeables.closeQuietly(cluster);
cluster = null;
}
private void killLeader() throws Exception
{
for ( InstanceSpec spec : cluster.getInstances() )
{
FourLetterWord fourLetterWord = new FourLetterWord(FourLetterWord.Word.SRVR, "localhost", spec.getPort(), 5000);
String mode = fourLetterWord.getResponseMap().get("mode");
if ( "leader".equalsIgnoreCase(mode) )
{
System.out.println("killing leader");
cluster.killServer(spec);
break;
}
}
}
@Test
public void testFailoverWithZK() throws Exception
{
final ZooKeeper zk = new ZooKeeper(cluster.getConnectString(), 15000, new Watcher()
{
@Override
public void process(WatchedEvent event)
{
info("Event: " + event.getType());
}
});
final AtomicBoolean leaderKilled = new AtomicBoolean(false);
//noinspection InfiniteLoopStatement
while ( true )
{
for ( int t = 0; t < 5; t++ )
{
Thread thread = new Thread(new Runnable()
{
@Override
public void run()
{
for ( int batchIndex = 0; batchIndex < 1000; batchIndex++ )
{
try
{
long startTime = System.currentTimeMillis();
final Count count = new Count();
int noOfOps = 100;
for ( int i = 0; i < noOfOps; i++ )
{
zk.setData(
"/testNode",
new Date().toString().getBytes(),
-1,
new AsyncCallback.StatCallback()
{
@Override
public void processResult(int rc, String path, Object ctx, Stat stat)
{
count.value++;
//info("Upate result:" + rc);
}
}, null);
}
while ( true )
{
if ( (System.currentTimeMillis() - startTime) > (30 * 1000) )
{
break;
}
if ( count.value == noOfOps )
{
break;
}
Thread.sleep(500);
}
info("Done batch " + batchIndex);
if ( batchIndex == 40 )
{
if ( leaderKilled.compareAndSet(false, true) )
{
killLeader();
}
}
}
catch ( Exception e )
{
e.printStackTrace();
}
}
}
});
thread.start();
}
Thread.sleep(60 * 1000);
info("Starting again.....");
}
}
CuratorFramework cf;
boolean connectionRestarted = false;
@Test
public void testFailoverUsingCF() throws Exception
{
cf = CuratorFrameworkFactory.newClient(cluster.getConnectString(), new ExponentialBackoffRetry(5000, 5));
cf.start();
//noinspection InfiniteLoopStatement
final AtomicBoolean leaderKilled = new AtomicBoolean(false);
while ( true )
{
for ( int t = 0; t < 5; t++ )
{
Thread thread = new Thread(new Runnable()
{
@Override
public void run()
{
for ( int batchIndex = 0; batchIndex < 1000; batchIndex++ )
{
try
{
long startTime = System.currentTimeMillis();
final Count count = new Count();
int noOfOps = 100;
for ( int i = 0; i < noOfOps; i++ )
{
cf.setData()
.inBackground(new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
count.value++;
// logger.info("Update result {}", event.getResultCode());
}
})
.forPath("/testNode");
}
while ( true )
{
if ( (System.currentTimeMillis() - startTime) > (30 * 1000) )
{
// if (!connectionRestarted) {
// connectionRestarted = true;
//
// cf.close();
// cf = CuratorFrameworkFactory.newClient(
// "host1:2181,host2:2181,host3:2181",
// new ExponentialBackoffRetry(5000, 5));
// cf.start();
// }
break;
}
if ( count.value == noOfOps )
{
break;
}
Thread.sleep(100);
}
info("Done batch " + batchIndex);
if ( batchIndex == 40 )
{
if ( leaderKilled.compareAndSet(false, true) )
{
killLeader();
}
}
}
catch ( Exception e )
{
e.printStackTrace();
}
}
}
});
thread.start();
}
Thread.sleep(60 * 1000);
info("Starting again.....");
}
}
private void info(String message)
{
System.out.println(message);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment