Skip to content

Instantly share code, notes, and snippets.

@gyim
Last active August 29, 2015 14:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gyim/caea1b73cc8fa8f6997b to your computer and use it in GitHub Desktop.
Save gyim/caea1b73cc8fa8f6997b to your computer and use it in GitHub Desktop.
Connection LOST is sometimes not reported in Apache Curator
/*
* Usage:
* 1. Start this program with `java -jar curator-test.jar <zkHost:zkPort>`.
* Use a non-local ZooKeeper server.
* 2. Run the following command in another window:
* iptables -A OUTPUT -p tcp --dport 2181 -j DROP; sleep 15; iptables -D OUTPUT -p tcp --dport 2181 -j DROP
*
* Expected result: the program exits after receiving LOST connection state
*
* Result: the program usually does not exit (it receives only SUSPENDED and RECONNECTED events),
* and remains the leader forever, even though its ephemeral node disappears from ZooKeeper
*/
package hu.gyim.curatortest;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.logging.Logger;
public class Main {
public static void main(String[] args) {
if (args.length < 1) {
System.out.println("Usage: java -jar curator-test.jar <zkHost:zkPort>");
System.exit(1);
}
String zkHost = args[0];
CuratorFramework zk = CuratorFrameworkFactory.builder()
.connectString(zkHost)
.retryPolicy(new ExponentialBackoffRetry(1000, 10))
.connectionTimeoutMs(1000)
.sessionTimeoutMs(10000)
.build();
final Object o = new Object();
LeaderSelector selector = new LeaderSelector(zk, "/test-leader", new LeaderSelectorListener() {
@Override
public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
Logger.getLogger("test").info("Taking leadership");
synchronized (o) {
o.wait();
}
}
@Override
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
Logger.getLogger("test").info("Connection state: " + connectionState.toString());
if (connectionState == ConnectionState.LOST) {
synchronized (o) {
o.notifyAll();
}
}
}
});
selector.start();
zk.start();
synchronized (o) {
try {
o.wait();
} catch (InterruptedException e) {}
}
zk.close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment