Skip to content

Instantly share code, notes, and snippets.

@yoshi0309
Created July 17, 2018 06:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yoshi0309/36a58d6c2130057d649f8197e93309ea to your computer and use it in GitHub Desktop.
Save yoshi0309/36a58d6c2130057d649f8197e93309ea to your computer and use it in GitHub Desktop.
CuratorFramework Experiment handling RuntimeException which throws in a Listener.
import java.util.concurrent.TimeUnit;
public class ExceptionHandler
implements Thread.UncaughtExceptionHandler {
private final long WAIT_TIME = 60L;
@Override public void uncaughtException(Thread thread, Throwable e) {
// -------------------------------------------------
// RuntimeException which was throwed in TestListener should handled here, but not.
// -------------------------------------------------
System.out.println("Exception has occurred.");
e.printStackTrace();
System.out.println("sleep thread in " + WAIT_TIME);
try {
TimeUnit.MILLISECONDS.sleep(WAIT_TIME);
} catch (InterruptedException e1) {
return;
}
}
}
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
public class ExceptionThreadFactory implements ThreadFactory {
private static final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
private final Thread.UncaughtExceptionHandler handler;
public ExceptionThreadFactory(
Thread.UncaughtExceptionHandler handler) {
this.handler = handler;
}
@Override public Thread newThread(Runnable run) {
Thread thread = defaultFactory.newThread(run);
thread.setUncaughtExceptionHandler(handler);
return thread;
}
}
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
public class Main1 {
public static void main(String[] args) {
List<String> hosts = new ArrayList<String>();
hosts.add("my.test.zookeeper:2181");
Set<String> newSet = new HashSet<>(hosts);
newSet = Collections.unmodifiableSet(newSet);
final ExponentialBackoffRetry backoffRetry = new ExponentialBackoffRetry(500,3);
CuratorFramework curatorFramework = null;
try {
curatorFramework = createCuratorFramework(newSet, 3000, 3000, 3, 3000, backoffRetry);
} catch (InterruptedException e) {
e.printStackTrace();
}
TestListener listener = new TestListener();
String electionNode = "/somewhere/leader"; // for test
ThreadFactory factory = new ExceptionThreadFactory(new ExceptionHandler());
ExecutorService executorService = Executors.newFixedThreadPool(1, factory);
LeaderSelector selector = new LeaderSelector(curatorFramework, electionNode, executorService, listener);
// start
System.out.println("start selector.");
selector.autoRequeue();
selector.start();
}
private static CuratorFramework createCuratorFramework(
final Set<String> connectionSet,
final int sessionTimeoutMs,
final int connectionTimeoutMs,
final int connectionRetry,
final int maxCloseWaitMs,
final ExponentialBackoffRetry backoffRetry
) throws InterruptedException {
CuratorFramework curatorFramework = CuratorFrameworkFactory
.builder()
.connectString(String.join(",", connectionSet))
.connectionTimeoutMs(connectionTimeoutMs)
.sessionTimeoutMs(sessionTimeoutMs)
.maxCloseWaitMs(maxCloseWaitMs)
.retryPolicy(backoffRetry)
.defaultData(new byte[0])
.build();
curatorFramework.start();
long start = System.nanoTime();
boolean connection = false;
for (int i = 0; i < connectionRetry; i++) {
connection = curatorFramework.getZookeeperClient().blockUntilConnectedOrTimedOut();
if (connection) {
break;
}
}
return curatorFramework;
}
}
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
public class TestListener implements LeaderSelectorListener {
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
System.out.println("start takeLeadership.");
// throw excetipn
throw new RuntimeException("some runtime exception has been occurred !");
//System.out.println("end takeLeadership.");
}
@Override
public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
System.out.println("Listener state changed: " + newState.toString());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment