Skip to content

Instantly share code, notes, and snippets.

Created March 29, 2012 22:16
Show Gist options
  • Save anonymous/2244298 to your computer and use it in GitHub Desktop.
Save anonymous/2244298 to your computer and use it in GitHub Desktop.
package org.curator.tests;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Properties;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.junit.Test;
import com.google.common.io.Files;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
import com.netflix.curator.framework.recipes.leader.LeaderSelector;
import com.netflix.curator.framework.recipes.leader.LeaderSelectorListener;
import com.netflix.curator.framework.state.ConnectionState;
import com.netflix.curator.retry.RetryOneTime;
import com.netflix.curator.test.TestingCluster;
import com.netflix.curator.test.TestingCluster.InstanceSpec;
public class TestLeaderSelectorTimeout {
public QuorumPeerConfig buildConfig(int id) throws Exception {
File tmpDir = Files.createTempDir();
FileWriter fstream = new FileWriter(tmpDir.getAbsolutePath() + "/myid");
fstream.write(String.valueOf(id));
fstream.close();
Properties cfg = new Properties();
cfg.setProperty("initLimit", "10");
cfg.setProperty("syncLimit", "5");
cfg.setProperty("dataDir", tmpDir.getAbsolutePath());
cfg.setProperty("clientPort", "" + (2600 + id));
cfg.setProperty("server.1", String.format("localhost:%s:%s", 2700, 2800));
cfg.setProperty("server.2", String.format("localhost:%s:%s", 2701, 2801));
QuorumPeerConfig zooConfig = new QuorumPeerConfig();
zooConfig.parseProperties(cfg);
return zooConfig;
}
@Test
public void testLeaderSelectorTimeout() throws Exception {
ZooKeeperServer sv1 = new ZooKeeperServer(buildConfig(1));
ZooKeeperServer sv2 = new ZooKeeperServer(buildConfig(2));
sv1.start();
sv2.start();
CuratorFramework zk1 = waitForInit("localhost:2601");
CuratorFramework zk2 = waitForInit("localhost:2602");
LeaderSelectorListener listener1 = new TestListener("1");
LeaderSelectorListener listener2 = new TestListener("2");
LeaderSelector sel1 = new LeaderSelector(zk1, "/leader", listener1);
LeaderSelector sel2 = new LeaderSelector(zk2, "/leader", listener2);
sel1.autoRequeue();
sel2.autoRequeue();
sel1.start();
sel2.start();
waitMaster(sel1, sel2, 2000);
Thread.sleep(500);
sv1.shutdown();
Thread.sleep(30000);
sv1.start();
Thread.sleep(10000);
assertTrue(zk2.getZookeeperClient().blockUntilConnectedOrTimedOut());
assertTrue(zk1.getZookeeperClient().blockUntilConnectedOrTimedOut());
waitMaster(sel1, sel2, 1000);
Thread.sleep(5000); // Wait to make sure both participants have retried leader selection
System.out.println("SEL1 PARTICIPANTS: " + sel1.getParticipants().size());
System.out.println("SEL2 PARTICIPANTS: " + sel2.getParticipants().size());
sel1.close();
sel2.close();
zk1.close();
zk2.close();
sv1.shutdown();
sv2.shutdown();
}
private CuratorFramework waitForInit(String host) throws InterruptedException, IOException {
CuratorFramework cf = CuratorFrameworkFactory.builder().retryPolicy(new RetryOneTime(1)).connectionTimeoutMs(5000).connectString(host).sessionTimeoutMs(2000).build();
cf.start();
cf.getZookeeperClient().blockUntilConnectedOrTimedOut();
return cf;
}
private void waitMaster(LeaderSelector sel1, LeaderSelector sel2, int timeout) throws Exception {
int slept = 0;
while (sel1.hasLeadership() == false && sel2.hasLeadership() == false) {
Thread.sleep(100);
slept += 100;
if (slept > timeout) {
assertTrue(false);
}
}
}
public static class TestListener implements LeaderSelectorListener {
private String id;
private Object notifier = new Object();
public TestListener(String id) {
this.id = id;
}
public void takeLeadership(CuratorFramework arg0) throws Exception {
System.out.println(id + ": TAKE LEADERSHIP !");
synchronized (notifier) {
notifier.wait();
}
System.out.println(id + ": LOSE LEADERSHIP");
}
public void stateChanged(CuratorFramework client, ConnectionState newState) {
System.out.println(id + ": STATE CHANGED: " + newState);
if (newState == ConnectionState.LOST) {
synchronized (notifier) {
notifier.notify();
}
}
}
}
}
package org.curator.tests;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.QuorumPeerMain;
public class ZooKeeperServer extends QuorumPeerMain {
private static final Log logger = LogFactory.getLog(ZooKeeperServer.class);
final private QuorumPeerConfig config;
private EnhancedQuorumPeerMain main;
public ZooKeeperServer(QuorumPeerConfig config) {
this.config = config;
}
public void shutdown()
{
main.getQuorumPeer().shutdown();
}
public void start() {
main = new EnhancedQuorumPeerMain();
new Thread(new Runnable() {
public void run() {
try {
main.runFromConfig(config);
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
while (main.getQuorumPeer() == null) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
}
private static class EnhancedQuorumPeerMain extends QuorumPeerMain {
public QuorumPeer getQuorumPeer() {
return quorumPeer;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment