Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@fireflyc
Created September 17, 2016 07:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save fireflyc/51d5467ef48b7f8c4a7747e5ecbd3fd0 to your computer and use it in GitHub Desktop.
Save fireflyc/51d5467ef48b7f8c4a7747e5ecbd3fd0 to your computer and use it in GitHub Desktop.
Zookeeper Leader Election
public class LeaderElection {
private static Logger LOGGER = LoggerFactory.getLogger(LeaderElection.class);
public static void main(String args[]) throws IOException, KeeperException, InterruptedException {
String connectString = "localhost";
ZooKeeper zooKeeper = new ZooKeeper(connectString, 3000, new Watcher() {
@Override
public void process(WatchedEvent event) {
//无视session超时,连接断开等问题
LOGGER.info("zookeeper watch event {}", event);
}
});
//保证根节点存在
String root = "/ha";
if (zooKeeper.exists(root, null) == null) {
zooKeeper.create(root, new byte[]{}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
//申请做 leader
String prefix = "/ticket-";
String myVote = zooKeeper.create(root + prefix, new byte[]{}, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
LOGGER.debug("我的选票是 {}", myVote);
//查看选举结果(id最小的一个是leader)
checkLeader(zooKeeper, prefix, myVote, root);
Thread.currentThread().join();
}
protected static void checkLeader(final ZooKeeper zooKeeper, final String prefix, final String myVote,
final String root) throws KeeperException, InterruptedException {
List<String> allVote = zooKeeper.getChildren(root, new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
checkLeader(zooKeeper, prefix, myVote, root);
} catch (KeeperException | InterruptedException e) {
LOGGER.error(e.getMessage(), e);
}
}
});
if (isLeader(root, prefix, myVote, allVote)) {
LOGGER.info("成功当选领导 {}", myVote);
} else {
LOGGER.info("当选领导失败,等待下一轮选举 {}", myVote);
}
}
protected static boolean isLeader(String root, String prefix, String myVote, List<String> allVote) {
//寻找最小的id
String minVotePath = allVote.get(0);
String minVote = fixForSorting(prefix, minVotePath);
for (String vote : allVote) {
String thisVoteId = fixForSorting(prefix, vote);
if (thisVoteId.compareTo(minVote) < 0) {
minVotePath = vote;
minVote = thisVoteId;
}
}
LOGGER.debug("当前领导 {}", minVotePath);
return myVote.equals(root + "/" + minVotePath);
}
//剔除前缀比如ticket-0000000001变成0000000001
private static String fixForSorting(String prefix, String str) {
int index = str.lastIndexOf(prefix);
if (index >= 0) {
index += prefix.length();
return index <= str.length() ? str.substring(index) : "";
}
return str;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment