Created
March 29, 2012 22:16
-
-
Save anonymous/2244298 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.curator.tests; | |
import static org.junit.Assert.assertTrue; | |
import java.io.File; | |
import java.io.FileWriter; | |
import java.io.IOException; | |
import java.util.Properties; | |
import org.apache.zookeeper.server.quorum.QuorumPeerConfig; | |
import org.junit.Test; | |
import com.google.common.io.Files; | |
import com.netflix.curator.framework.CuratorFramework; | |
import com.netflix.curator.framework.CuratorFrameworkFactory; | |
import com.netflix.curator.framework.recipes.leader.LeaderSelector; | |
import com.netflix.curator.framework.recipes.leader.LeaderSelectorListener; | |
import com.netflix.curator.framework.state.ConnectionState; | |
import com.netflix.curator.retry.RetryOneTime; | |
import com.netflix.curator.test.TestingCluster; | |
import com.netflix.curator.test.TestingCluster.InstanceSpec; | |
public class TestLeaderSelectorTimeout { | |
public QuorumPeerConfig buildConfig(int id) throws Exception { | |
File tmpDir = Files.createTempDir(); | |
FileWriter fstream = new FileWriter(tmpDir.getAbsolutePath() + "/myid"); | |
fstream.write(String.valueOf(id)); | |
fstream.close(); | |
Properties cfg = new Properties(); | |
cfg.setProperty("initLimit", "10"); | |
cfg.setProperty("syncLimit", "5"); | |
cfg.setProperty("dataDir", tmpDir.getAbsolutePath()); | |
cfg.setProperty("clientPort", "" + (2600 + id)); | |
cfg.setProperty("server.1", String.format("localhost:%s:%s", 2700, 2800)); | |
cfg.setProperty("server.2", String.format("localhost:%s:%s", 2701, 2801)); | |
QuorumPeerConfig zooConfig = new QuorumPeerConfig(); | |
zooConfig.parseProperties(cfg); | |
return zooConfig; | |
} | |
@Test | |
public void testLeaderSelectorTimeout() throws Exception { | |
ZooKeeperServer sv1 = new ZooKeeperServer(buildConfig(1)); | |
ZooKeeperServer sv2 = new ZooKeeperServer(buildConfig(2)); | |
sv1.start(); | |
sv2.start(); | |
CuratorFramework zk1 = waitForInit("localhost:2601"); | |
CuratorFramework zk2 = waitForInit("localhost:2602"); | |
LeaderSelectorListener listener1 = new TestListener("1"); | |
LeaderSelectorListener listener2 = new TestListener("2"); | |
LeaderSelector sel1 = new LeaderSelector(zk1, "/leader", listener1); | |
LeaderSelector sel2 = new LeaderSelector(zk2, "/leader", listener2); | |
sel1.autoRequeue(); | |
sel2.autoRequeue(); | |
sel1.start(); | |
sel2.start(); | |
waitMaster(sel1, sel2, 2000); | |
Thread.sleep(500); | |
sv1.shutdown(); | |
Thread.sleep(30000); | |
sv1.start(); | |
Thread.sleep(10000); | |
assertTrue(zk2.getZookeeperClient().blockUntilConnectedOrTimedOut()); | |
assertTrue(zk1.getZookeeperClient().blockUntilConnectedOrTimedOut()); | |
waitMaster(sel1, sel2, 1000); | |
Thread.sleep(5000); // Wait to make sure both participants have retried leader selection | |
System.out.println("SEL1 PARTICIPANTS: " + sel1.getParticipants().size()); | |
System.out.println("SEL2 PARTICIPANTS: " + sel2.getParticipants().size()); | |
sel1.close(); | |
sel2.close(); | |
zk1.close(); | |
zk2.close(); | |
sv1.shutdown(); | |
sv2.shutdown(); | |
} | |
private CuratorFramework waitForInit(String host) throws InterruptedException, IOException { | |
CuratorFramework cf = CuratorFrameworkFactory.builder().retryPolicy(new RetryOneTime(1)).connectionTimeoutMs(5000).connectString(host).sessionTimeoutMs(2000).build(); | |
cf.start(); | |
cf.getZookeeperClient().blockUntilConnectedOrTimedOut(); | |
return cf; | |
} | |
private void waitMaster(LeaderSelector sel1, LeaderSelector sel2, int timeout) throws Exception { | |
int slept = 0; | |
while (sel1.hasLeadership() == false && sel2.hasLeadership() == false) { | |
Thread.sleep(100); | |
slept += 100; | |
if (slept > timeout) { | |
assertTrue(false); | |
} | |
} | |
} | |
public static class TestListener implements LeaderSelectorListener { | |
private String id; | |
private Object notifier = new Object(); | |
public TestListener(String id) { | |
this.id = id; | |
} | |
public void takeLeadership(CuratorFramework arg0) throws Exception { | |
System.out.println(id + ": TAKE LEADERSHIP !"); | |
synchronized (notifier) { | |
notifier.wait(); | |
} | |
System.out.println(id + ": LOSE LEADERSHIP"); | |
} | |
public void stateChanged(CuratorFramework client, ConnectionState newState) { | |
System.out.println(id + ": STATE CHANGED: " + newState); | |
if (newState == ConnectionState.LOST) { | |
synchronized (notifier) { | |
notifier.notify(); | |
} | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.curator.tests; | |
import java.io.IOException; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.apache.zookeeper.server.quorum.QuorumPeer; | |
import org.apache.zookeeper.server.quorum.QuorumPeerConfig; | |
import org.apache.zookeeper.server.quorum.QuorumPeerMain; | |
public class ZooKeeperServer extends QuorumPeerMain { | |
private static final Log logger = LogFactory.getLog(ZooKeeperServer.class); | |
final private QuorumPeerConfig config; | |
private EnhancedQuorumPeerMain main; | |
public ZooKeeperServer(QuorumPeerConfig config) { | |
this.config = config; | |
} | |
public void shutdown() | |
{ | |
main.getQuorumPeer().shutdown(); | |
} | |
public void start() { | |
main = new EnhancedQuorumPeerMain(); | |
new Thread(new Runnable() { | |
public void run() { | |
try { | |
main.runFromConfig(config); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
}).start(); | |
while (main.getQuorumPeer() == null) { | |
try { | |
Thread.sleep(100); | |
} catch (InterruptedException e) { | |
} | |
} | |
} | |
private static class EnhancedQuorumPeerMain extends QuorumPeerMain { | |
public QuorumPeer getQuorumPeer() { | |
return quorumPeer; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment