Skip to content

Instantly share code, notes, and snippets.

@Randgalt
Created August 13, 2012 22:00
Show Gist options
  • Save Randgalt/3344385 to your computer and use it in GitHub Desktop.
Save Randgalt/3344385 to your computer and use it in GitHub Desktop.
public void testKilledServerWithEnsembleProvider() throws Exception
{
final int CLIENT_QTY = 10;
final Timing timing = new Timing();
final String PATH = "/foo/bar/lock";
ExecutorService executorService = Executors.newFixedThreadPool(CLIENT_QTY);
ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<Void>(executorService);
TestingCluster cluster = new TestingCluster(3);
try
{
cluster.start();
final AtomicReference<String> connectionString = new AtomicReference<String>(cluster.getConnectString());
final EnsembleProvider provider = new EnsembleProvider()
{
@Override
public void start() throws Exception
{
}
@Override
public String getConnectionString()
{
return connectionString.get();
}
@Override
public void close() throws IOException
{
}
};
final Semaphore acquiredSemaphore = new Semaphore(0);
final AtomicInteger acquireCount = new AtomicInteger(0);
final CountDownLatch suspendedLatch = new CountDownLatch(CLIENT_QTY);
for ( int i = 0; i < CLIENT_QTY; ++i )
{
completionService.submit
(
new Callable<Void>()
{
@Override
public Void call() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.builder()
.ensembleProvider(provider)
.sessionTimeoutMs(timing.session())
.connectionTimeoutMs(timing.connection())
.retryPolicy(new ExponentialBackoffRetry(100, 3))
.build();
try
{
final Semaphore suspendedSemaphore = new Semaphore(0);
client.getConnectionStateListenable().addListener
(
new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
if ( (newState == ConnectionState.SUSPENDED) || (newState == ConnectionState.LOST) )
{
suspendedLatch.countDown();
suspendedSemaphore.release();
}
}
}
);
client.start();
InterProcessSemaphore semaphore = new InterProcessSemaphore(client, PATH, 1);
while ( !Thread.currentThread().isInterrupted() )
{
Lease lease = null;
try
{
lease = semaphore.acquire();
acquiredSemaphore.release();
acquireCount.incrementAndGet();
suspendedSemaphore.acquire();
}
catch ( Exception e )
{
// just retry
}
finally
{
if ( lease != null )
{
acquireCount.decrementAndGet();
Closeables.closeQuietly(lease);
}
}
}
}
finally
{
Closeables.closeQuietly(client);
}
return null;
}
}
);
}
Assert.assertTrue(timing.acquireSemaphore(acquiredSemaphore));
Assert.assertEquals(1, acquireCount.get());
cluster.close();
timing.awaitLatch(suspendedLatch);
timing.forWaiting().sleepABit();
Assert.assertEquals(0, acquireCount.get());
cluster = new TestingCluster(3);
cluster.start();
connectionString.set(cluster.getConnectString());
timing.forWaiting().sleepABit();
Assert.assertTrue(timing.acquireSemaphore(acquiredSemaphore));
timing.forWaiting().sleepABit();
Assert.assertEquals(1, acquireCount.get());
}
finally
{
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
Closeables.closeQuietly(cluster);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment