Skip to content

Instantly share code, notes, and snippets.

@bbeck
Created May 31, 2012 23:01
Show Gist options
  • Save bbeck/2847041 to your computer and use it in GitHub Desktop.
Save bbeck/2847041 to your computer and use it in GitHub Desktop.
Curator ephemeral node test that fails...
package curator;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
import com.netflix.curator.framework.state.ConnectionState;
import com.netflix.curator.framework.state.ConnectionStateListener;
import com.netflix.curator.retry.RetryNTimes;
import com.netflix.curator.test.KillSession;
import com.netflix.curator.test.TestingServer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class EphemeralNodeTest {
private static final Logger LOGGER = LoggerFactory.getLogger(EphemeralNodeTest.class);
private static final String PATH = "/foo";
private static final int NUM_ATTEMPTS = 3;
private static final int CREATE_TIMEOUT_IN_SECONDS = 10;
private static final int DELETE_TIMEOUT_IN_SECONDS = 10;
private static final int RECREATE_TIMEOUT_IN_SECONDS = 10; // If this is made to be 20, the test seems to always pass.
private final TestingServer _testingServer;
private final CuratorFramework _curator;
private final CuratorFramework _verificationCurator;
public static void main(String[] args) throws Exception {
int i = 0;
while (++i > 0) {
long startTime = System.currentTimeMillis();
EphemeralNodeTest test = null;
try {
LOGGER.info("");
LOGGER.info("###");
LOGGER.info("### STARTING ITERATION #{}", i);
LOGGER.info("###");
LOGGER.info("");
test = new EphemeralNodeTest();
test.run();
} catch (Exception e) {
LOGGER.info("Caught:", e);
throw e;
} finally {
if (test != null) {
LOGGER.info("Calling test.cleanup()...");
test.cleanup();
LOGGER.info("Finished test.cleanup().");
}
LOGGER.info("");
LOGGER.info("###");
double duration = (System.currentTimeMillis() - startTime) / 1000.;
LOGGER.info("### FINISHED ITERATION #{} (duration: {}s)", i, duration);
LOGGER.info("###");
LOGGER.info("");
LOGGER.info(Strings.repeat("=", 100));
}
}
}
public EphemeralNodeTest() throws Exception {
_testingServer = log("Creating TestingServer", new Callable<TestingServer>() {
@Override
public TestingServer call() throws Exception {
return new TestingServer();
}
});
_curator = log("Creating curator", new Callable<CuratorFramework>() {
@Override
public CuratorFramework call() throws Exception {
return newCurator();
}
});
_verificationCurator = log("Creating verification curator", new Callable<CuratorFramework>() {
@Override
public CuratorFramework call() throws Exception {
return newCurator();
}
});
}
private void cleanup() throws Exception {
log("Closing verification curator", new Runnable() {
@Override
public void run() {
Closeables.closeQuietly(_verificationCurator);
}
});
log("Closing curator", new Runnable() {
@Override
public void run() {
Closeables.closeQuietly(_curator);
}
});
log("Closing TestingServer", new Runnable() {
@Override
public void run() {
Closeables.closeQuietly(_testingServer);
}
});
}
private void run() throws Exception {
LOGGER.info("STARTING...");
log("Registering connection listener", new Runnable() {
@Override
public void run() {
_curator.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if (newState == ConnectionState.RECONNECTED) {
new Thread() {
@Override
public void run() {
boolean success = createEphemeralNode(PATH);
LOGGER.info("Recreated node: {}", success);
}
}.start();
}
}
});
}
});
try {
boolean success = log("Registering " + PATH, new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return createEphemeralNode(PATH);
}
});
assertTrue(success);
// Wait until the node is visible to the verification curator...
boolean visible = log("Waiting until " + PATH + " is visible to verification curator", new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
Trigger trigger = new Trigger();
Stat stat = _verificationCurator
.checkExists()
.usingWatcher(trigger)
.forPath(PATH);
return (stat != null) || trigger.waitUntilFired(CREATE_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
}
});
assertTrue(visible);
final Trigger deletionTrigger = log("Registering deletion trigger with verification curator", new Callable<Trigger>() {
@Override
public Trigger call() throws Exception {
Trigger trigger = new Trigger();
_verificationCurator
.checkExists()
.usingWatcher(trigger)
.forPath(PATH);
return trigger;
}
});
// Kill the main curator session, thus cleaning up the node...
log("Killing main curator session", new Runnable() {
@Override
public void run() {
killSession(_curator);
}
});
boolean deleted = log("Waiting for " + PATH + " to be deleted", new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return deletionTrigger.waitUntilFired(DELETE_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
}
});
assertTrue(deleted);
// Now put a watch in the background looking to see if it gets created...
boolean recreated = log("Waiting for " + PATH + " to be recreated", new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
Trigger trigger = new Trigger();
Stat stat = _verificationCurator
.checkExists()
.usingWatcher(trigger)
.forPath(PATH);
return (stat != null) || trigger.waitUntilFired(RECREATE_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
}
});
assertTrue(recreated);
} finally {
LOGGER.info("FINISHED.");
}
}
private boolean createEphemeralNode(String path) {
for (int i = 0; i < NUM_ATTEMPTS; i++) {
try {
_curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(path);
return true;
} catch (KeeperException.NodeExistsException e) {
LOGGER.info("i:{}, KeeperException.NodeExistsException in register: {}", i, e.getMessage());
// Sometimes a server can restart faster than ZooKeeper can notice and clean up the ephemeral node. So
// when this happens we won't be able to create a new ephemeral node because one is already there. This
// is problematic because the existing ephemeral node isn't tied to our session with ZooKeeper and thus
// not tied to our lifetime. So in order to make sure that we end up creating a node tied to our
// lifetime we will delete the existing node and create a new one from our session.
if (!deleteNode(path)) {
// We weren't able to delete the node after trying multiple times. Propagate the original
// exception to our caller as a RuntimeException.
throw Throwables.propagate(e);
}
} catch (Exception e) {
LOGGER.info("i:{}, Ignored exception in register", i);
LOGGER.info("Exception", e);
}
}
return false;
}
private boolean deleteNode(String path) {
for (int i = 0; i < NUM_ATTEMPTS; i++) {
try {
_curator.delete().forPath(path);
return true;
} catch (Exception e) {
LOGGER.info("i:{}, Ignored exception in deleteNode", i);
LOGGER.info("Exception:", e);
}
}
return false;
}
public void killSession(CuratorFramework curator) {
try {
KillSession.kill(curator.getZookeeperClient().getZooKeeper(), _testingServer.getConnectString());
} catch (Exception e) {
throw Throwables.propagate(e);
}
}
private CuratorFramework newCurator() throws Exception {
CuratorFramework curator = CuratorFrameworkFactory.builder()
.connectString(_testingServer.getConnectString())
.retryPolicy(new RetryNTimes(0, 0))
.threadFactory(new ThreadFactoryBuilder().setDaemon(true).build())
.build();
curator.start();
return curator;
}
private static <T> T log(String name, Callable<T> callable) throws Exception {
long startTime = System.currentTimeMillis();
LOGGER.info("{}: STARTING", name);
try {
return callable.call();
} catch (Exception e) {
LOGGER.info(name + " Exception", e);
throw e;
} finally {
double duration = (System.currentTimeMillis() - startTime) / 1000.;
LOGGER.info("{}: FINISHED (duration: {}s)", name, duration);
}
}
private static void log(String name, final Runnable runnable) throws Exception {
log(name, new Callable<Void>() {
@Override
public Void call() {
runnable.run();
return null;
}
});
}
private void assertTrue(boolean expression) {
if (!expression) {
throw new RuntimeException("FAILURE");
}
}
private static final class Trigger implements Watcher {
private final CountDownLatch _latch = new CountDownLatch(1);
@Override
public void process(WatchedEvent event) {
_latch.countDown();
}
public boolean waitUntilFired(long duration, TimeUnit unit) {
try {
return _latch.await(duration, unit);
} catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
}
}
[main] INFO curator.EphemeralNodeTest -
[main] INFO curator.EphemeralNodeTest - ###
[main] INFO curator.EphemeralNodeTest - ### STARTING ITERATION #1
[main] INFO curator.EphemeralNodeTest - ###
[main] INFO curator.EphemeralNodeTest -
[main] INFO curator.EphemeralNodeTest - Creating TestingServer: STARTING
[main] INFO curator.EphemeralNodeTest - Creating TestingServer: FINISHED (duration: 0.117s)
[main] INFO curator.EphemeralNodeTest - Creating curator: STARTING
[main] INFO curator.EphemeralNodeTest - Creating curator: FINISHED (duration: 0.038s)
[main] INFO curator.EphemeralNodeTest - Creating verification curator: STARTING
[main] INFO curator.EphemeralNodeTest - Creating verification curator: FINISHED (duration: 0.0040s)
[main] INFO curator.EphemeralNodeTest - STARTING...
[main] INFO curator.EphemeralNodeTest - Registering connection listener: STARTING
[main] INFO curator.EphemeralNodeTest - Registering connection listener: FINISHED (duration: 0.0040s)
[main] INFO curator.EphemeralNodeTest - Registering /foo: STARTING
[main] INFO curator.EphemeralNodeTest - Registering /foo: FINISHED (duration: 5.002s)
[main] INFO curator.EphemeralNodeTest - Waiting until /foo is visible to verification curator: STARTING
[main] INFO curator.EphemeralNodeTest - Waiting until /foo is visible to verification curator: FINISHED (duration: 0.0050s)
[main] INFO curator.EphemeralNodeTest - Registering deletion trigger with verification curator: STARTING
[main] INFO curator.EphemeralNodeTest - Registering deletion trigger with verification curator: FINISHED (duration: 0.0010s)
[main] INFO curator.EphemeralNodeTest - Killing main curator session: STARTING
[main] INFO curator.EphemeralNodeTest - Killing main curator session: FINISHED (duration: 5.111s)
[main] INFO curator.EphemeralNodeTest - Waiting for /foo to be deleted: STARTING
[main] INFO curator.EphemeralNodeTest - Waiting for /foo to be deleted: FINISHED (duration: 0.0s)
[main] INFO curator.EphemeralNodeTest - Waiting for /foo to be recreated: STARTING
[Thread-4] INFO curator.EphemeralNodeTest - Recreated node: true
[main] INFO curator.EphemeralNodeTest - Waiting for /foo to be recreated: FINISHED (duration: 0.815s)
[main] INFO curator.EphemeralNodeTest - FINISHED.
[main] INFO curator.EphemeralNodeTest - Calling test.cleanup()...
[main] INFO curator.EphemeralNodeTest - Closing verification curator: STARTING
[main] INFO curator.EphemeralNodeTest - Closing verification curator: FINISHED (duration: 0.0010s)
[main] INFO curator.EphemeralNodeTest - Closing curator: STARTING
[main] INFO curator.EphemeralNodeTest - Closing curator: FINISHED (duration: 0.0s)
[main] INFO curator.EphemeralNodeTest - Closing TestingServer: STARTING
[main] INFO curator.EphemeralNodeTest - Closing TestingServer: FINISHED (duration: 0.263s)
[main] INFO curator.EphemeralNodeTest - Finished test.cleanup().
[main] INFO curator.EphemeralNodeTest -
[main] INFO curator.EphemeralNodeTest - ###
[main] INFO curator.EphemeralNodeTest - ### FINISHED ITERATION #1 (duration: 11.375s)
[main] INFO curator.EphemeralNodeTest - ###
[main] INFO curator.EphemeralNodeTest -
[main] INFO curator.EphemeralNodeTest - ====================================================================================================
[main] INFO curator.EphemeralNodeTest -
[main] INFO curator.EphemeralNodeTest - ###
[main] INFO curator.EphemeralNodeTest - ### STARTING ITERATION #2
[main] INFO curator.EphemeralNodeTest - ###
[main] INFO curator.EphemeralNodeTest -
[main] INFO curator.EphemeralNodeTest - Creating TestingServer: STARTING
[main] INFO curator.EphemeralNodeTest - Creating TestingServer: FINISHED (duration: 0.0060s)
[main] INFO curator.EphemeralNodeTest - Creating curator: STARTING
[main] INFO curator.EphemeralNodeTest - Creating curator: FINISHED (duration: 0.0010s)
[main] INFO curator.EphemeralNodeTest - Creating verification curator: STARTING
[main] INFO curator.EphemeralNodeTest - Creating verification curator: FINISHED (duration: 0.0010s)
[main] INFO curator.EphemeralNodeTest - STARTING...
[main] INFO curator.EphemeralNodeTest - Registering connection listener: STARTING
[main] INFO curator.EphemeralNodeTest - Registering connection listener: FINISHED (duration: 0.0s)
[main] INFO curator.EphemeralNodeTest - Registering /foo: STARTING
[main] INFO curator.EphemeralNodeTest - Registering /foo: FINISHED (duration: 0.021s)
[main] INFO curator.EphemeralNodeTest - Waiting until /foo is visible to verification curator: STARTING
[main] INFO curator.EphemeralNodeTest - Waiting until /foo is visible to verification curator: FINISHED (duration: 0.0010s)
[main] INFO curator.EphemeralNodeTest - Registering deletion trigger with verification curator: STARTING
[main] INFO curator.EphemeralNodeTest - Registering deletion trigger with verification curator: FINISHED (duration: 0.0s)
[main] INFO curator.EphemeralNodeTest - Killing main curator session: STARTING
[main] INFO curator.EphemeralNodeTest - Killing main curator session: FINISHED (duration: 0.107s)
[main] INFO curator.EphemeralNodeTest - Waiting for /foo to be deleted: STARTING
[main] INFO curator.EphemeralNodeTest - Waiting for /foo to be deleted: FINISHED (duration: 0.0s)
[main] INFO curator.EphemeralNodeTest - Waiting for /foo to be recreated: STARTING
[main] INFO curator.EphemeralNodeTest - Waiting for /foo to be recreated: FINISHED (duration: 0.926s)
[main] INFO curator.EphemeralNodeTest - FINISHED.
[Thread-6] INFO curator.EphemeralNodeTest - Recreated node: true
[main] INFO curator.EphemeralNodeTest - Calling test.cleanup()...
[main] INFO curator.EphemeralNodeTest - Closing verification curator: STARTING
[main] INFO curator.EphemeralNodeTest - Closing verification curator: FINISHED (duration: 0.0010s)
[main] INFO curator.EphemeralNodeTest - Closing curator: STARTING
[main] INFO curator.EphemeralNodeTest - Closing curator: FINISHED (duration: 0.0020s)
[main] INFO curator.EphemeralNodeTest - Closing TestingServer: STARTING
[main] INFO curator.EphemeralNodeTest - Closing TestingServer: FINISHED (duration: 0.305s)
[main] INFO curator.EphemeralNodeTest - Finished test.cleanup().
[main] INFO curator.EphemeralNodeTest -
[main] INFO curator.EphemeralNodeTest - ###
[main] INFO curator.EphemeralNodeTest - ### FINISHED ITERATION #2 (duration: 1.376s)
[main] INFO curator.EphemeralNodeTest - ###
[main] INFO curator.EphemeralNodeTest -
[main] INFO curator.EphemeralNodeTest - ====================================================================================================
[main] INFO curator.EphemeralNodeTest -
[main] INFO curator.EphemeralNodeTest - ###
[main] INFO curator.EphemeralNodeTest - ### STARTING ITERATION #3
[main] INFO curator.EphemeralNodeTest - ###
[main] INFO curator.EphemeralNodeTest -
[main] INFO curator.EphemeralNodeTest - Creating TestingServer: STARTING
[main] INFO curator.EphemeralNodeTest - Creating TestingServer: FINISHED (duration: 0.0050s)
[main] INFO curator.EphemeralNodeTest - Creating curator: STARTING
[main] INFO curator.EphemeralNodeTest - Creating curator: FINISHED (duration: 0.0s)
[main] INFO curator.EphemeralNodeTest - Creating verification curator: STARTING
[main] INFO curator.EphemeralNodeTest - Creating verification curator: FINISHED (duration: 0.0020s)
[main] INFO curator.EphemeralNodeTest - STARTING...
[main] INFO curator.EphemeralNodeTest - Registering connection listener: STARTING
[main] INFO curator.EphemeralNodeTest - Registering connection listener: FINISHED (duration: 0.0s)
[main] INFO curator.EphemeralNodeTest - Registering /foo: STARTING
[main] INFO curator.EphemeralNodeTest - Registering /foo: FINISHED (duration: 0.011s)
[main] INFO curator.EphemeralNodeTest - Waiting until /foo is visible to verification curator: STARTING
[main] INFO curator.EphemeralNodeTest - Waiting until /foo is visible to verification curator: FINISHED (duration: 0.0010s)
[main] INFO curator.EphemeralNodeTest - Registering deletion trigger with verification curator: STARTING
[main] INFO curator.EphemeralNodeTest - Registering deletion trigger with verification curator: FINISHED (duration: 0.0s)
[main] INFO curator.EphemeralNodeTest - Killing main curator session: STARTING
[main] INFO curator.EphemeralNodeTest - Killing main curator session: FINISHED (duration: 0.106s)
[main] INFO curator.EphemeralNodeTest - Waiting for /foo to be deleted: STARTING
[main] INFO curator.EphemeralNodeTest - Waiting for /foo to be deleted: FINISHED (duration: 0.0s)
[main] INFO curator.EphemeralNodeTest - Waiting for /foo to be recreated: STARTING
[main] INFO curator.EphemeralNodeTest - Waiting for /foo to be recreated: FINISHED (duration: 10.0s)
[main] INFO curator.EphemeralNodeTest - FINISHED.
[main] INFO curator.EphemeralNodeTest - Caught:
java.lang.RuntimeException: FAILURE
at curator.EphemeralNodeTest.assertTrue(EphemeralNodeTest.java:299) ~[classes/:na]
at curator.EphemeralNodeTest.run(EphemeralNodeTest.java:206) ~[classes/:na]
at curator.EphemeralNodeTest.main(EphemeralNodeTest.java:52) ~[classes/:na]
[main] INFO curator.EphemeralNodeTest - Calling test.cleanup()...
[main] INFO curator.EphemeralNodeTest - Closing verification curator: STARTING
[main] INFO curator.EphemeralNodeTest - Closing verification curator: FINISHED (duration: 0.0010s)
[main] INFO curator.EphemeralNodeTest - Closing curator: STARTING
[main] INFO curator.EphemeralNodeTest - Closing curator: FINISHED (duration: 1.171s)
[main] INFO curator.EphemeralNodeTest - Closing TestingServer: STARTING
[main] INFO curator.EphemeralNodeTest - Closing TestingServer: FINISHED (duration: 0.268s)
[main] INFO curator.EphemeralNodeTest - Finished test.cleanup().
[main] INFO curator.EphemeralNodeTest -
[main] INFO curator.EphemeralNodeTest - ###
[main] INFO curator.EphemeralNodeTest - ### FINISHED ITERATION #3 (duration: 11.575s)
[main] INFO curator.EphemeralNodeTest - ###
[main] INFO curator.EphemeralNodeTest -
[main] INFO curator.EphemeralNodeTest - ====================================================================================================
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment