Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
wrapinterruptible
package org.foo.wrapinterruptible;
import static org.foo.wrapinterruptible.InterruptibleInputStream.StreamOp.*;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.net.SocketException;
import java.net.SocketTimeoutException;
/**
* InputStream wrapper that mimics the behavior of a socket input stream. The
* read methods have a timeout. The {@link #close()} call returns instantly and
* cannot throw.
* <p>
* The actual read is performed in a dedicated thread. The thread lives until
* {@link #close()} is called and the current read operation completes.
* <p>
* The main purpose of the class is to wrap pipes. The thread MAY hang during
* read, if nothing is written to the other side of the pipe. Then again the
* thread is merely a dependency of the resource on the other side of the pipe.
* If that resource leaks, the thread will stay.
* <p>
* Data is not lost because of read timeouts and the last operation can be
* repeated. A side effect is that a single {@link #skip(long)} call cannot skip
* more bytes than we can save in the buffer.
* <p>
* Methods may throw the usual SocketInputStream exceptions plus
* {@link InterruptedException} in case of interrupted wait. In that case, the
* stream is not closed and the interrupted status of the thread is set.
*/
public class InterruptibleInputStream extends FilterInputStream {
public InterruptibleInputStream(InputStream in) {
super(in);
}
public void setOpTimeout(long millis) {
this.timeoutNanos = millis * MILLION;
}
/**
* If false, wait operations will throw {@link InterruptedIOException}. If
* true, to behave more like a socket, wait operations will retry after
* {@link InterruptedException}. In the end, the interrupted status of the
* thread will be restored. Default is true.
*/
public void setIgnoreInterrupts(boolean b) {
this.ignoreInterrupts = b;
}
private long timeoutNanos;
private boolean ignoreInterrupts = true;
enum StreamOp {
READ, AVAIL
}
private final Object lock = new Object();
private Thread waiter;
private Thread worker;
private boolean opCompleted;
private StreamOp currentOp;
private boolean closed;
private byte[] currentBuf = new byte[1024];
private int currentOfs, currentLen;
private int resInt;
private Throwable resEx;
// final value assigned to currentOp on return from requestOp()
private StreamOp finalOpValue;
@Override
public int read() throws IOException {
byte[] buf = new byte[1];
return read(buf, 0, 1) == -1 ? -1 : buf[0];
}
@Override
public int read(byte[] b) throws IOException {
return read(b, 0, b.length);
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
}
return checkAndRequestOp(READ, b, off, len);
}
@Override
public long skip(long n) throws IOException {
if (n <= 0) {
return 0;
}
int skipN = (int) Math.min(currentBuf.length, n);
int res = checkAndRequestOp(READ, null, 0, skipN);
return res == -1 ? 0 : res;
}
@Override
public int available() throws IOException {
return checkAndRequestOp(AVAIL, null, 0, 0);
}
@Override
public void close() throws IOException {
synchronized (lock) {
if (!closed) {
closed = true;
lock.notifyAll();
}
}
}
@Override
public void mark(int readlimit) {
//
}
@Override
public void reset() throws IOException {
//
}
@Override
public boolean markSupported() {
return false;
}
private int checkAndRequestOp(StreamOp op, byte[] buf, int off, int len)
throws IOException {
long deadline = System.nanoTime()
+ (timeoutNanos > 0 ? timeoutNanos : Long.MAX_VALUE);
synchronized (lock) {
checkClosed();
ensureThread();
boolean[] interrupted = new boolean[1];
try {
while (waiter != null && waiter.isAlive()) {
waitOrThrow(deadline, interrupted);
}
waiter = Thread.currentThread();
try {
return requestOp(op, buf, off, len, deadline, interrupted);
} finally {
currentOp = finalOpValue;
waiter = null;
lock.notifyAll();
}
} finally {
if (interrupted[0]) {
Thread.currentThread().interrupt();
}
}
}
}
private void checkClosed() throws IOException {
if (closed) {
throw new SocketException("Stream closed");
}
}
private int requestOp(StreamOp op, byte[] buf, int off, int len,
long deadline, boolean[] interrupted) throws IOException {
finalOpValue = currentOp;
if (currentOp != null) {
if (op == AVAIL && currentOp == READ && !opCompleted) {
// available() should not block for long unless another thread
// is waiting for a read op
return 0;
}
// currentOp will not change during the following wait()
while (!opCompleted) {
waitOrThrow(deadline, interrupted);
}
// default is to reset finalOpValue unless there's
// unread data in buffer which can be read
finalOpValue = null;
if (currentOp == READ) {
switch (op) {
case AVAIL:
if (resEx != null || resInt <= 0) {
// try again: maybe EOF no more
break;
}
// currentOp should stay READ
finalOpValue = currentOp;
return resInt;
case READ:
if (resEx != null || resInt <= 0) {
// try again: maybe EOF no more
break;
}
int newLen = Math.min(resInt, len);
if (buf != null) {
System.arraycopy(currentBuf, currentOfs, buf, off,
newLen);
}
currentOfs += newLen;
resInt -= newLen;
if (resInt != 0) {
// something's left in buffer
// currentOp should stay READ
finalOpValue = currentOp;
}
return newLen;
}
}
}
resEx = null;
// buf is null when called from skip()
if (op == READ && currentBuf.length < len) {
currentBuf = new byte[len];
}
currentOfs = 0;
currentLen = len;
finalOpValue = op;
currentOp = op;
opCompleted = false;
lock.notifyAll();
do {
waitOrThrow(deadline, interrupted);
} while (!opCompleted);
finalOpValue = null;
if (resEx != null) {
if (resEx instanceof IOException) {
// IOException subclasses handled differently by caller.
// Cannot wrap with a generic IOException.
if (resEx.getCause() == null) {
// Can preserve original stacktrace in the cause.
Exception causeStackTrace = new Exception(resEx.getClass()
.getName() + ": " + resEx.getMessage());
causeStackTrace.setStackTrace(resEx.getStackTrace());
resEx.initCause(causeStackTrace);
}
// Re-thrown exception must have current thread's stacktrace.
resEx.fillInStackTrace();
throw (IOException) resEx;
}
throw new RuntimeException(resEx);
}
if (buf != null && resInt != -1) {
System.arraycopy(currentBuf, 0, buf, off, resInt);
}
return resInt;
}
private void ensureThread() {
if (worker != null) {
return;
}
worker = new Thread() {
@Override
public void run() {
try {
runOps();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
worker.setName("reader-" + worker.getId());
worker.setDaemon(true);
worker.start();
}
private void runOps() throws InterruptedException {
boolean alreadyThrowing = true;
try {
while (waitOp()) {
runOp();
}
alreadyThrowing = false;
} finally {
synchronized (lock) {
closed = true;
lock.notifyAll();
}
try {
in.close();
} catch (Exception e) {
if (!alreadyThrowing) {
throw new RuntimeException(e);
}
}
}
}
private boolean waitOp() throws InterruptedException {
synchronized (lock) {
while (!closed) {
if (currentOp != null && !opCompleted) {
return true;
}
lock.wait();
}
}
return false;
}
private void runOp() {
try {
switch (currentOp) {
case READ:
resInt = in.read(currentBuf, 0, currentLen);
break;
case AVAIL:
resInt = in.available();
break;
}
} catch (Error e) {
resEx = e;
throw e;
} catch (Throwable e) {
resEx = e;
} finally {
synchronized (lock) {
opCompleted = true;
lock.notifyAll();
}
}
}
private void waitOrThrow(long deadline, boolean[] interrupted)
throws IOException {
long delay = deadline - System.nanoTime();
if (delay <= 0) {
throw new SocketTimeoutException("timeout");
}
try {
lock.wait(delay / MILLION, (int) (delay % MILLION));
} catch (InterruptedException e) {
interrupted[0] = true;
if (!ignoreInterrupts) {
throw new InterruptedIOException("interrupted");
}
}
checkClosed();
}
private static int MILLION = 1000000;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment