Skip to content

Instantly share code, notes, and snippets.

Created May 31, 2012 23:01
Show Gist options
  • Save bbeck/2847041 to your computer and use it in GitHub Desktop.
Save bbeck/2847041 to your computer and use it in GitHub Desktop.
Curator ephemeral node test that fails...
package curator;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class EphemeralNodeTest {
private static final Logger LOGGER = LoggerFactory.getLogger(EphemeralNodeTest.class);
private static final String PATH = "/foo";
private static final int NUM_ATTEMPTS = 3;
private static final int CREATE_TIMEOUT_IN_SECONDS = 10;
private static final int DELETE_TIMEOUT_IN_SECONDS = 10;
private static final int RECREATE_TIMEOUT_IN_SECONDS = 10; // If this is made to be 20, the test seems to always pass.
private final TestingServer _testingServer;
private final CuratorFramework _curator;
private final CuratorFramework _verificationCurator;
public static void main(String[] args) throws Exception {
int i = 0;
while (++i > 0) {
long startTime = System.currentTimeMillis();
EphemeralNodeTest test = null;
try {"");"###");"### STARTING ITERATION #{}", i);"###");"");
test = new EphemeralNodeTest();;
} catch (Exception e) {"Caught:", e);
throw e;
} finally {
if (test != null) {"Calling test.cleanup()...");
test.cleanup();"Finished test.cleanup().");
double duration = (System.currentTimeMillis() - startTime) / 1000.;"### FINISHED ITERATION #{} (duration: {}s)", i, duration);"###");"");"=", 100));
public EphemeralNodeTest() throws Exception {
_testingServer = log("Creating TestingServer", new Callable<TestingServer>() {
public TestingServer call() throws Exception {
return new TestingServer();
_curator = log("Creating curator", new Callable<CuratorFramework>() {
public CuratorFramework call() throws Exception {
return newCurator();
_verificationCurator = log("Creating verification curator", new Callable<CuratorFramework>() {
public CuratorFramework call() throws Exception {
return newCurator();
private void cleanup() throws Exception {
log("Closing verification curator", new Runnable() {
public void run() {
log("Closing curator", new Runnable() {
public void run() {
log("Closing TestingServer", new Runnable() {
public void run() {
private void run() throws Exception {"STARTING...");
log("Registering connection listener", new Runnable() {
public void run() {
_curator.getConnectionStateListenable().addListener(new ConnectionStateListener() {
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if (newState == ConnectionState.RECONNECTED) {
new Thread() {
public void run() {
boolean success = createEphemeralNode(PATH);"Recreated node: {}", success);
try {
boolean success = log("Registering " + PATH, new Callable<Boolean>() {
public Boolean call() throws Exception {
return createEphemeralNode(PATH);
// Wait until the node is visible to the verification curator...
boolean visible = log("Waiting until " + PATH + " is visible to verification curator", new Callable<Boolean>() {
public Boolean call() throws Exception {
Trigger trigger = new Trigger();
Stat stat = _verificationCurator
return (stat != null) || trigger.waitUntilFired(CREATE_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
final Trigger deletionTrigger = log("Registering deletion trigger with verification curator", new Callable<Trigger>() {
public Trigger call() throws Exception {
Trigger trigger = new Trigger();
return trigger;
// Kill the main curator session, thus cleaning up the node...
log("Killing main curator session", new Runnable() {
public void run() {
boolean deleted = log("Waiting for " + PATH + " to be deleted", new Callable<Boolean>() {
public Boolean call() throws Exception {
return deletionTrigger.waitUntilFired(DELETE_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
// Now put a watch in the background looking to see if it gets created...
boolean recreated = log("Waiting for " + PATH + " to be recreated", new Callable<Boolean>() {
public Boolean call() throws Exception {
Trigger trigger = new Trigger();
Stat stat = _verificationCurator
return (stat != null) || trigger.waitUntilFired(RECREATE_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
} finally {"FINISHED.");
private boolean createEphemeralNode(String path) {
for (int i = 0; i < NUM_ATTEMPTS; i++) {
try {
return true;
} catch (KeeperException.NodeExistsException e) {"i:{}, KeeperException.NodeExistsException in register: {}", i, e.getMessage());
// Sometimes a server can restart faster than ZooKeeper can notice and clean up the ephemeral node. So
// when this happens we won't be able to create a new ephemeral node because one is already there. This
// is problematic because the existing ephemeral node isn't tied to our session with ZooKeeper and thus
// not tied to our lifetime. So in order to make sure that we end up creating a node tied to our
// lifetime we will delete the existing node and create a new one from our session.
if (!deleteNode(path)) {
// We weren't able to delete the node after trying multiple times. Propagate the original
// exception to our caller as a RuntimeException.
throw Throwables.propagate(e);
} catch (Exception e) {"i:{}, Ignored exception in register", i);"Exception", e);
return false;
private boolean deleteNode(String path) {
for (int i = 0; i < NUM_ATTEMPTS; i++) {
try {
return true;
} catch (Exception e) {"i:{}, Ignored exception in deleteNode", i);"Exception:", e);
return false;
public void killSession(CuratorFramework curator) {
try {
KillSession.kill(curator.getZookeeperClient().getZooKeeper(), _testingServer.getConnectString());
} catch (Exception e) {
throw Throwables.propagate(e);
private CuratorFramework newCurator() throws Exception {
CuratorFramework curator = CuratorFrameworkFactory.builder()
.retryPolicy(new RetryNTimes(0, 0))
.threadFactory(new ThreadFactoryBuilder().setDaemon(true).build())
return curator;
private static <T> T log(String name, Callable<T> callable) throws Exception {
long startTime = System.currentTimeMillis();"{}: STARTING", name);
try {
} catch (Exception e) { + " Exception", e);
throw e;
} finally {
double duration = (System.currentTimeMillis() - startTime) / 1000.;"{}: FINISHED (duration: {}s)", name, duration);
private static void log(String name, final Runnable runnable) throws Exception {
log(name, new Callable<Void>() {
public Void call() {;
return null;
private void assertTrue(boolean expression) {
if (!expression) {
throw new RuntimeException("FAILURE");
private static final class Trigger implements Watcher {
private final CountDownLatch _latch = new CountDownLatch(1);
public void process(WatchedEvent event) {
public boolean waitUntilFired(long duration, TimeUnit unit) {
try {
return _latch.await(duration, unit);
} catch (InterruptedException e) {
throw Throwables.propagate(e);
[main] INFO curator.EphemeralNodeTest -
[main] INFO curator.EphemeralNodeTest - ###
[main] INFO curator.EphemeralNodeTest - ### STARTING ITERATION #1
[main] INFO curator.EphemeralNodeTest - ###
[main] INFO curator.EphemeralNodeTest -
[main] INFO curator.EphemeralNodeTest - Creating TestingServer: STARTING
[main] INFO curator.EphemeralNodeTest - Creating TestingServer: FINISHED (duration: 0.117s)
[main] INFO curator.EphemeralNodeTest - Creating curator: STARTING
[main] INFO curator.EphemeralNodeTest - Creating curator: FINISHED (duration: 0.038s)
[main] INFO curator.EphemeralNodeTest - Creating verification curator: STARTING
[main] INFO curator.EphemeralNodeTest - Creating verification curator: FINISHED (duration: 0.0040s)
[main] INFO curator.EphemeralNodeTest - STARTING...
[main] INFO curator.EphemeralNodeTest - Registering connection listener: STARTING
[main] INFO curator.EphemeralNodeTest - Registering connection listener: FINISHED (duration: 0.0040s)
[main] INFO curator.EphemeralNodeTest - Registering /foo: STARTING
[main] INFO curator.EphemeralNodeTest - Registering /foo: FINISHED (duration: 5.002s)
[main] INFO curator.EphemeralNodeTest - Waiting until /foo is visible to verification curator: STARTING
[main] INFO curator.EphemeralNodeTest - Waiting until /foo is visible to verification curator: FINISHED (duration: 0.0050s)
[main] INFO curator.EphemeralNodeTest - Registering deletion trigger with verification curator: STARTING
[main] INFO curator.EphemeralNodeTest - Registering deletion trigger with verification curator: FINISHED (duration: 0.0010s)
[main] INFO curator.EphemeralNodeTest - Killing main curator session: STARTING
[main] INFO curator.EphemeralNodeTest - Killing main curator session: FINISHED (duration: 5.111s)
[main] INFO curator.EphemeralNodeTest - Waiting for /foo to be deleted: STARTING
[main] INFO curator.EphemeralNodeTest - Waiting for /foo to be deleted: FINISHED (duration: 0.0s)
[main] INFO curator.EphemeralNodeTest - Waiting for /foo to be recreated: STARTING
[Thread-4] INFO curator.EphemeralNodeTest - Recreated node: true
[main] INFO curator.EphemeralNodeTest - Waiting for /foo to be recreated: FINISHED (duration: 0.815s)
[main] INFO curator.EphemeralNodeTest - FINISHED.
[main] INFO curator.EphemeralNodeTest - Calling test.cleanup()...
[main] INFO curator.EphemeralNodeTest - Closing verification curator: STARTING
[main] INFO curator.EphemeralNodeTest - Closing verification curator: FINISHED (duration: 0.0010s)
[main] INFO curator.EphemeralNodeTest - Closing curator: STARTING
[main] INFO curator.EphemeralNodeTest - Closing curator: FINISHED (duration: 0.0s)
[main] INFO curator.EphemeralNodeTest - Closing TestingServer: STARTING
[main] INFO curator.EphemeralNodeTest - Closing TestingServer: FINISHED (duration: 0.263s)
[main] INFO curator.EphemeralNodeTest - Finished test.cleanup().
[main] INFO curator.EphemeralNodeTest -
[main] INFO curator.EphemeralNodeTest - ###
[main] INFO curator.EphemeralNodeTest - ### FINISHED ITERATION #1 (duration: 11.375s)
[main] INFO curator.EphemeralNodeTest - ###
[main] INFO curator.EphemeralNodeTest -
[main] INFO curator.EphemeralNodeTest - ====================================================================================================
[main] INFO curator.EphemeralNodeTest -
[main] INFO curator.EphemeralNodeTest - ###
[main] INFO curator.EphemeralNodeTest - ### STARTING ITERATION #2
[main] INFO curator.EphemeralNodeTest - ###
[main] INFO curator.EphemeralNodeTest -
[main] INFO curator.EphemeralNodeTest - Creating TestingServer: STARTING
[main] INFO curator.EphemeralNodeTest - Creating TestingServer: FINISHED (duration: 0.0060s)
[main] INFO curator.EphemeralNodeTest - Creating curator: STARTING
[main] INFO curator.EphemeralNodeTest - Creating curator: FINISHED (duration: 0.0010s)
[main] INFO curator.EphemeralNodeTest - Creating verification curator: STARTING
[main] INFO curator.EphemeralNodeTest - Creating verification curator: FINISHED (duration: 0.0010s)
[main] INFO curator.EphemeralNodeTest - STARTING...
[main] INFO curator.EphemeralNodeTest - Registering connection listener: STARTING
[main] INFO curator.EphemeralNodeTest - Registering connection listener: FINISHED (duration: 0.0s)
[main] INFO curator.EphemeralNodeTest - Registering /foo: STARTING
[main] INFO curator.EphemeralNodeTest - Registering /foo: FINISHED (duration: 0.021s)
[main] INFO curator.EphemeralNodeTest - Waiting until /foo is visible to verification curator: STARTING
[main] INFO curator.EphemeralNodeTest - Waiting until /foo is visible to verification curator: FINISHED (duration: 0.0010s)
[main] INFO curator.EphemeralNodeTest - Registering deletion trigger with verification curator: STARTING
[main] INFO curator.EphemeralNodeTest - Registering deletion trigger with verification curator: FINISHED (duration: 0.0s)
[main] INFO curator.EphemeralNodeTest - Killing main curator session: STARTING
[main] INFO curator.EphemeralNodeTest - Killing main curator session: FINISHED (duration: 0.107s)
[main] INFO curator.EphemeralNodeTest - Waiting for /foo to be deleted: STARTING
[main] INFO curator.EphemeralNodeTest - Waiting for /foo to be deleted: FINISHED (duration: 0.0s)
[main] INFO curator.EphemeralNodeTest - Waiting for /foo to be recreated: STARTING
[main] INFO curator.EphemeralNodeTest - Waiting for /foo to be recreated: FINISHED (duration: 0.926s)
[main] INFO curator.EphemeralNodeTest - FINISHED.
[Thread-6] INFO curator.EphemeralNodeTest - Recreated node: true
[main] INFO curator.EphemeralNodeTest - Calling test.cleanup()...
[main] INFO curator.EphemeralNodeTest - Closing verification curator: STARTING
[main] INFO curator.EphemeralNodeTest - Closing verification curator: FINISHED (duration: 0.0010s)
[main] INFO curator.EphemeralNodeTest - Closing curator: STARTING
[main] INFO curator.EphemeralNodeTest - Closing curator: FINISHED (duration: 0.0020s)
[main] INFO curator.EphemeralNodeTest - Closing TestingServer: STARTING
[main] INFO curator.EphemeralNodeTest - Closing TestingServer: FINISHED (duration: 0.305s)
[main] INFO curator.EphemeralNodeTest - Finished test.cleanup().
[main] INFO curator.EphemeralNodeTest -
[main] INFO curator.EphemeralNodeTest - ###
[main] INFO curator.EphemeralNodeTest - ### FINISHED ITERATION #2 (duration: 1.376s)
[main] INFO curator.EphemeralNodeTest - ###
[main] INFO curator.EphemeralNodeTest -
[main] INFO curator.EphemeralNodeTest - ====================================================================================================
[main] INFO curator.EphemeralNodeTest -
[main] INFO curator.EphemeralNodeTest - ###
[main] INFO curator.EphemeralNodeTest - ### STARTING ITERATION #3
[main] INFO curator.EphemeralNodeTest - ###
[main] INFO curator.EphemeralNodeTest -
[main] INFO curator.EphemeralNodeTest - Creating TestingServer: STARTING
[main] INFO curator.EphemeralNodeTest - Creating TestingServer: FINISHED (duration: 0.0050s)
[main] INFO curator.EphemeralNodeTest - Creating curator: STARTING
[main] INFO curator.EphemeralNodeTest - Creating curator: FINISHED (duration: 0.0s)
[main] INFO curator.EphemeralNodeTest - Creating verification curator: STARTING
[main] INFO curator.EphemeralNodeTest - Creating verification curator: FINISHED (duration: 0.0020s)
[main] INFO curator.EphemeralNodeTest - STARTING...
[main] INFO curator.EphemeralNodeTest - Registering connection listener: STARTING
[main] INFO curator.EphemeralNodeTest - Registering connection listener: FINISHED (duration: 0.0s)
[main] INFO curator.EphemeralNodeTest - Registering /foo: STARTING
[main] INFO curator.EphemeralNodeTest - Registering /foo: FINISHED (duration: 0.011s)
[main] INFO curator.EphemeralNodeTest - Waiting until /foo is visible to verification curator: STARTING
[main] INFO curator.EphemeralNodeTest - Waiting until /foo is visible to verification curator: FINISHED (duration: 0.0010s)
[main] INFO curator.EphemeralNodeTest - Registering deletion trigger with verification curator: STARTING
[main] INFO curator.EphemeralNodeTest - Registering deletion trigger with verification curator: FINISHED (duration: 0.0s)
[main] INFO curator.EphemeralNodeTest - Killing main curator session: STARTING
[main] INFO curator.EphemeralNodeTest - Killing main curator session: FINISHED (duration: 0.106s)
[main] INFO curator.EphemeralNodeTest - Waiting for /foo to be deleted: STARTING
[main] INFO curator.EphemeralNodeTest - Waiting for /foo to be deleted: FINISHED (duration: 0.0s)
[main] INFO curator.EphemeralNodeTest - Waiting for /foo to be recreated: STARTING
[main] INFO curator.EphemeralNodeTest - Waiting for /foo to be recreated: FINISHED (duration: 10.0s)
[main] INFO curator.EphemeralNodeTest - FINISHED.
[main] INFO curator.EphemeralNodeTest - Caught:
java.lang.RuntimeException: FAILURE
at curator.EphemeralNodeTest.assertTrue( ~[classes/:na]
at ~[classes/:na]
at curator.EphemeralNodeTest.main( ~[classes/:na]
[main] INFO curator.EphemeralNodeTest - Calling test.cleanup()...
[main] INFO curator.EphemeralNodeTest - Closing verification curator: STARTING
[main] INFO curator.EphemeralNodeTest - Closing verification curator: FINISHED (duration: 0.0010s)
[main] INFO curator.EphemeralNodeTest - Closing curator: STARTING
[main] INFO curator.EphemeralNodeTest - Closing curator: FINISHED (duration: 1.171s)
[main] INFO curator.EphemeralNodeTest - Closing TestingServer: STARTING
[main] INFO curator.EphemeralNodeTest - Closing TestingServer: FINISHED (duration: 0.268s)
[main] INFO curator.EphemeralNodeTest - Finished test.cleanup().
[main] INFO curator.EphemeralNodeTest -
[main] INFO curator.EphemeralNodeTest - ###
[main] INFO curator.EphemeralNodeTest - ### FINISHED ITERATION #3 (duration: 11.575s)
[main] INFO curator.EphemeralNodeTest - ###
[main] INFO curator.EphemeralNodeTest -
[main] INFO curator.EphemeralNodeTest - ====================================================================================================
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment