Skip to content

Instantly share code, notes, and snippets.

@mrocabado
Last active July 15, 2020 00:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mrocabado/0cb159c2a9edf5f0e967f77c262ef1ef to your computer and use it in GitHub Desktop.
Save mrocabado/0cb159c2a9edf5f0e967f77c262ef1ef to your computer and use it in GitHub Desktop.
Crash detection with ZooKeeper

Crash detection with ZooKeeper

In this demo we can see how a component can detect if other component is active/working.

package com.mindwaresrl.zookeeper.client;
import org.apache.zookeeper.*;
import java.io.IOException;
import java.time.Instant;
public class BranchOffice implements Watcher {
private ZooKeeper zooKeeper;
public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
BranchOffice branchOffice = new BranchOffice();
branchOffice.startZkSession();
branchOffice.reportReadiness();
System.out.println("Wait for 15 seconds before terminating...");
Thread.sleep(15000);
branchOffice.close();
}
@Override
public void process(WatchedEvent event) {
System.out.println("Default client watcher @ BranchOffice : " + event.toString());
}
private void startZkSession() throws IOException {
int sessionTimeout = 5_000; //millis
zooKeeper = new ZooKeeper("127.0.0.1:2181",sessionTimeout,this);
}
protected void close() throws InterruptedException {
zooKeeper.close();
}
private void reportReadiness() throws KeeperException, InterruptedException {
try {
String branchId = zooKeeper.create("/branch-office/branch-1", Instant.now().toString().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println(branchId + " added as ephemeral data (znode)");
} catch (KeeperException.ConnectionLossException e) {
e.printStackTrace();
//ZK client can reconnect, so keep going.
}
}
}
package com.mindwaresrl.zookeeper.client;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
public class HeadQuarters implements Watcher {
public static final String BRANCH_OFFICE_ZK_PATH = "/branch-office";
private ZooKeeper zooKeeper;
public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
HeadQuarters headQuarters = new HeadQuarters();
headQuarters.startZkSession();
if ( !headQuarters.pathExists(BRANCH_OFFICE_ZK_PATH) ) {
headQuarters.createNode(BRANCH_OFFICE_ZK_PATH, new byte[0]);
}
headQuarters.enablePathMonitoring(BRANCH_OFFICE_ZK_PATH);
System.out.println("Wait branch events for one minute...");
Thread.sleep(60000);
System.out.println("closing connection");
headQuarters.disablePathMonitoring(BRANCH_OFFICE_ZK_PATH);
headQuarters.close();
}
private boolean pathExists(String path) {
try {
Stat stat = new Stat();
byte data[] = zooKeeper.getData(path,this,stat);
System.out.println("/branch-office stats: " + stat.toString());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException.NoNodeException e) {
return false;
} catch (KeeperException e) {
e.printStackTrace();
}
return true;
}
private void createNode(String path, byte[] data) throws KeeperException, InterruptedException {
zooKeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
private void enablePathMonitoring(String path) throws InterruptedException, KeeperException {
try {
zooKeeper.addWatch(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(event.toString());
switch (event.getType()) {
case NodeCreated:
System.err.println(event.getPath().replace(BRANCH_OFFICE_ZK_PATH,"") + " Connected");
break;
case NodeDeleted:
System.err.println(event.getPath().replace(BRANCH_OFFICE_ZK_PATH,"") + " Disconnected");
break;
}
}
}, AddWatchMode.PERSISTENT_RECURSIVE);
} catch (KeeperException.ConnectionLossException e) {
//ZK client can reconnect, so keep going.
}
}
private void disablePathMonitoring(String path) throws InterruptedException, KeeperException {
boolean isLocal = false;
try {
zooKeeper.removeWatches(path, this, WatcherType.Any, isLocal);
} catch (KeeperException.ConnectionLossException e) {
//ZK client can reconnect, so keep going.
}
}
@Override
public void process(WatchedEvent event) {
System.out.println("Default client watcher @ HeadQuarters : " + event.toString());
}
private void startZkSession() throws IOException {
int sessionTimeout = 15_000; //millis
zooKeeper = new ZooKeeper("127.0.0.1:2181",sessionTimeout,this);
}
protected void close() throws InterruptedException {
zooKeeper.close();
}
}
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mindwaresrl</groupId>
<artifactId>zookeeper-client</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<zookeeper.version>3.6.1</zookeeper.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment