Skip to content

Instantly share code, notes, and snippets.

@mikeb01
Created June 16, 2012 08:12
Show Gist options
  • Save mikeb01/2940469 to your computer and use it in GitHub Desktop.
Save mikeb01/2940469 to your computer and use it in GitHub Desktop.
package com.lmax.disruptor;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import com.lmax.disruptor.dsl.Disruptor;
public class DisruptorShutdownTest
{
private static final class TestEventHandler
implements EventHandler<Object>, LifecycleAware
{
private final CyclicBarrier barrier;
public TestEventHandler(CyclicBarrier barrier)
{
this.barrier = barrier;
}
@Override
public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception
{
// do something useful here
}
@Override
public void onShutdown()
{
System.out.println("Shutdown");
}
@Override
public void onStart()
{
try
{
barrier.await();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
catch (BrokenBarrierException e)
{
e.printStackTrace();
}
}
}
@SuppressWarnings("unchecked")
@Test
public void testExecutorShutdownWithDisruptor()
throws InterruptedException, BrokenBarrierException
{
ExecutorService executor = Executors.newCachedThreadPool();
Disruptor<Object> disruptor = new Disruptor<Object>(new EventFactory<Object>()
{
@Override
public Object newInstance()
{
return new Object();
}
}, 1024, executor);
CyclicBarrier barrier = new CyclicBarrier(2);
disruptor.handleEventsWith(new TestEventHandler(barrier));
RingBuffer<Object> ringBuffer = disruptor.start();
barrier.await();
// final long nextSlot = ringBuffer.next();
// final Object event = ringBuffer.get(nextSlot);
// // do something useful here
// ringBuffer.publish(nextSlot);
disruptor.shutdown();
Thread.sleep(5000);
shutdownAndAwaitTermination(executor);
}
void shutdownAndAwaitTermination(ExecutorService pool)
{
pool.shutdown(); // Disable new tasks from being submitted
try
{
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(3, TimeUnit.SECONDS))
{
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(3, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
}
catch (InterruptedException ie)
{
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment