Skip to content

Instantly share code, notes, and snippets.

@gfrison
Last active February 12, 2016 14:33
Show Gist options
  • Save gfrison/110c952eda481c088132 to your computer and use it in GitHub Desktop.
Save gfrison/110c952eda481c088132 to your computer and use it in GitHub Desktop.
Monitor file writing for a given InputStream, with NIO.2 AsynchronousFileChannel and RxJava
public static Observable<Long> write(Path target, InputStream inputStream) {
return Observable.create(subscriber -> {
try {
final AtomicLong offset = new AtomicLong(0);
AsynchronousFileChannel afc = AsynchronousFileChannel.open(Utils.home.resolve(target), StandardOpenOption.WRITE,
StandardOpenOption.CREATE);
afc.lock(new Object(), new CompletionHandler<FileLock, Object>() {
@Override
public void completed(final FileLock lock, final Object attachment) {
log.debug("write:{} acquired lock", target);
final byte[] buf = new byte[1 << 16];
try {
int len = inputStream.read(buf);
if (len == -1) {
unsubscribe(subscriber, inputStream, lock);
return;
}
afc.write(ByteBuffer.wrap(Arrays.copyOfRange(buf, 0, len)), offset.get(), null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
log.debug("write:{} chunk of {} bytes", target, result);
subscriber.onNext(offset.addAndGet(result));
final byte[] buf = new byte[1 << 16];
int len = 0;
try {
len = inputStream.read(buf);
if (len == -1) {
unsubscribe(subscriber, inputStream, lock);
return;
}
} catch (IOException e) {
subscriber.onError(e);
return;
}
if (len == -1) {
unsubscribe(subscriber, inputStream, lock);
return;
}
afc.write(ByteBuffer.wrap(Arrays.copyOfRange(buf, 0, len)), offset.get(), null, this);
}
@Override
public void failed(Throwable exc, Object attachment) {
subscriber.onError(exc);
}
});
} catch (final Exception e) {
subscriber.onError(e);
}
}
@Override
public void failed(final Throwable exc, final Object attachment) {
log.error("error on getting lock for:{}, error:{}", target, exc.getMessage());
subscriber.onError(exc);
}
});
} catch (Exception e) {
log.error("error on file:{}", target);
subscriber.onError(e);
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment