Skip to content

Instantly share code, notes, and snippets.

Created April 4, 2012 14:18
Show Gist options
  • Save nschlimm/2301471 to your computer and use it in GitHub Desktop.
Save nschlimm/2301471 to your computer and use it in GitHub Desktop.
* Custom file channel that supports the following requirements:<br>
* - if a write task was submitted successfully, guarantee that the write operation is executed<br>
* - if the channel is closing then throw {@link NonWritableChannelException} to notify the client, that this channel
* isn't writable anymore
* @author Niklas Schlimm
public class GracefulAsynchronousFileChannel extends AsynchronousFileChannel {
* Singleton Map for convenient instantiation.
private static Map<String, GracefulAsynchronousFileChannel> singletonMap = new ConcurrentHashMap<>();
private static Lock singletonLock = new ReentrantLock();
* Lifecycle of graceful shutdown.
public static final int RUNNING = 0;
public static final int PREPARE = 1;
public static final int SHUTDOWN = 2;
* Transfers the considered channel in "prepare-shudown" phase.
private volatile int state = RUNNING;
* Avoid race condition of closing thread and "last" task of the queue that issues the "isEmpty" event.
private Lock closeLock = new ReentrantLock();
* Condition to coordinate closing thread and "last" task that issues "isEmpty" event.
private Condition isEmpty = closeLock.newCondition();
* Channel that the {@link GracefulAsynchronousFileChannel} works on.
private volatile AsynchronousFileChannel innerChannel;
* The thread pool for asynchronous task execution.
private DefensiveThreadPoolExecutor pool;
* The URI to the file that this channel works on.
private URI uri;
* Constructor for {@link GracefulAsynchronousFileChannel}.
* @param poolSize
* count of worker threads that process asynchronous tasks
* @param workQueue
* queue that holds asynchronous tasks
* @param fileUri
* uri to the file, e.g. "file:/E:/temp/afile.out"
* @param options
* {@link StandardOpenOption} for the file channel
public GracefulAsynchronousFileChannel(int poolSize, LinkedBlockingQueue<Runnable> workQueue, URI fileUri,
Set<StandardOpenOption> options) {
this.pool = new DefensiveThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, workQueue,
Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
this.uri = fileUri;
try {
this.innerChannel =, options, pool);
} catch (IOException e) {
* The factory method for creating {@link GracefulAsynchronousFileChannel}.
* @param fileUri
* the {@link URI} to the file
* @return the created file channel
public static GracefulAsynchronousFileChannel get(String fileUri) {
if (singletonMap.get(fileUri) == null) {
try {
if (singletonMap.get(fileUri) == null) {
new GracefulAsynchronousFileChannel(100, new LinkedBlockingQueue<Runnable>(), new URI(
fileUri), new HashSet<>(Arrays.asList(StandardOpenOption.CREATE,
StandardOpenOption.READ, StandardOpenOption.WRITE))));
} catch (URISyntaxException e) {
} finally {
return singletonMap.get(fileUri);
* Method that closes this file channel gracefully without loosing any data.
public void close() throws IOException {
AsynchronousFileChannel writeableChannel = innerChannel;
System.out.println("Starting graceful shutdown ...");
try {
state = PREPARE;
innerChannel =,
new HashSet<StandardOpenOption>(Arrays.asList(StandardOpenOption.READ)), pool);
System.out.println("Channel blocked for write access ...");
if (!pool.getQueue().isEmpty()) {
System.out.println("Waiting for signal that queue is empty ...");
System.out.println("Received signal that queue is empty ... closing");
} else {
System.out.println("Don't have to wait, queue is empty ...");
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted on awaiting Empty-Signal!", e);
} catch (Exception e) {
throw new RuntimeException("Unexpected error" + e);
} finally {
writeableChannel.close(); // close the writable channel
innerChannel.close(); // close the read-only channel
System.out.println("File closed ...");
pool.shutdown(); // allow clean up tasks from previous close() operation to finish safely
try {
pool.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException("Could not terminate thread pool!", e);
System.out.println("Pool closed ...");
* Custom {@link ThreadPoolExecutor} that supports graceful closing of asynchronous I/O channels.
private class DefensiveThreadPoolExecutor extends ThreadPoolExecutor {
public DefensiveThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
LinkedBlockingQueue<Runnable> workQueue, ThreadFactory factory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, factory, handler);
* "Last" task issues a signal that queue is empty after task processing was completed.
protected void afterExecute(Runnable r, Throwable t) {
if (state == PREPARE) {
closeLock.lock(); // only one thread will pass when closer thread is awaiting signal
try {
if (getQueue().isEmpty() && state < SHUTDOWN) {
System.out.println("Issueing signal that queue is empty ...");
state = SHUTDOWN; // -> no other thread can issue empty-signal
} finally {
super.afterExecute(r, t);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment