Created
March 20, 2012 14:05
-
-
Save nschlimm/2135968 to your computer and use it in GitHub Desktop.
GracefullChannelCloser
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Graceful shutdown that garantees that submitted tasks will be processed prior shutdown and that denies submission of | |
* new tasks during "prepare-shutdown" phase. | |
* | |
* @author Niklas Schlimm | |
* | |
*/ | |
public class SimpleChannelClose_Graceful { | |
private static final String FILE_NAME = "E:/temp/afile.out"; | |
private static AsynchronousFileChannel outputfile; | |
private static AtomicInteger fileindex = new AtomicInteger(0); | |
private static ThreadPoolExecutor pool = new DefensiveThreadPoolExecutor(100, 100, 0L, TimeUnit.MILLISECONDS, | |
new LinkedBlockingQueue<Runnable>(10000)); | |
public static void main(String[] args) throws InterruptedException, IOException, ExecutionException { | |
outputfile = AsynchronousFileChannel.open( | |
Paths.get(FILE_NAME), | |
new HashSet<StandardOpenOption>(Arrays.asList(StandardOpenOption.WRITE, StandardOpenOption.CREATE, | |
StandardOpenOption.DELETE_ON_CLOSE)), pool); | |
try (GracefullChannelCloser closer = new GracefullChannelCloser()) { | |
for (int i = 0; i < 10000; i++) { | |
outputfile.write(ByteBuffer.wrap("Hello".getBytes()), fileindex.getAndIncrement() * 5); | |
} | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
Future<Integer> future = outputfile.write(ByteBuffer.wrap("Hello".getBytes()), fileindex.getAndIncrement() * 5); | |
future.get(); | |
} | |
/** | |
* Avoid race condition of closing thread and "last" task of the queue that issues the "isEmpty" event. | |
*/ | |
private static Lock closeLock = new ReentrantLock(); | |
/** | |
* Condition to coordinate closing thread and "last" task that issues "isEmpty" event. | |
*/ | |
private static Condition isEmpty = closeLock.newCondition(); | |
/** | |
* Transfers the considered channel in "prepare-shudown" phase. | |
*/ | |
private static volatile boolean prepareShutdown = false; | |
/** | |
* {@link Closeable} that closes asynchronous channel group when queue is empty. You could place the | |
* {@link #close()} method where ever you want. | |
* | |
* @author Niklas Schlimm | |
* | |
*/ | |
static class GracefullChannelCloser implements Closeable { | |
@Override | |
public void close() throws IOException { | |
// Closing resources | |
closeLock.lock(); | |
try { | |
prepareShutdown = true; | |
if (!pool.getQueue().isEmpty()) { | |
System.out.println("Waiting for signal that queue is empty ..."); | |
isEmpty.await(); | |
System.out.println("Received signal that queue is empty ... closing"); | |
} | |
} catch (InterruptedException e) { | |
Thread.interrupted(); | |
e.printStackTrace(); | |
} finally { | |
closeLock.unlock(); | |
System.out.println("File size (bytes): " | |
+ DecimalFormat.getInstance().format(Files.size(Paths.get(FILE_NAME)))); | |
outputfile.close(); | |
System.out.println("File closed ..."); | |
pool.shutdown(); | |
try { | |
pool.awaitTermination(10, TimeUnit.MINUTES); | |
System.out.println("Pool closed ..."); | |
} catch (InterruptedException e) { | |
Thread.interrupted(); | |
e.printStackTrace(); | |
} | |
} | |
} | |
} | |
/** | |
* Custom {@link ThreadPoolExecutor} that supports graceful closing of asynchronous I/O channels. | |
*/ | |
private static class DefensiveThreadPoolExecutor extends ThreadPoolExecutor { | |
public DefensiveThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, | |
BlockingQueue<Runnable> workQueue) { | |
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); | |
} | |
/** | |
* Issues a signal if queue is empty after task processing was completed. | |
*/ | |
@Override | |
protected void afterExecute(Runnable r, Throwable t) { | |
if (prepareShutdown && pool.getQueue().isEmpty()) { | |
closeLock.lock(); | |
try { | |
System.out.println("Issueing signal that queue is empty ..."); | |
isEmpty.signal(); | |
} finally { | |
closeLock.unlock(); | |
} | |
} | |
super.afterExecute(r, t); | |
} | |
/** | |
* Throws an {@link IllegalStateException} if clients try to submit tasks in prepare-shutdown phase. | |
*/ | |
@Override | |
public void execute(Runnable command) { | |
if (prepareShutdown) | |
throw new IllegalStateException("Prepare-State - no tasks can be submitted!"); | |
super.execute(command); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment