wrapinterruptible
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.foo.wrapinterruptible; | |
import static org.foo.wrapinterruptible.InterruptibleInputStream.StreamOp.*; | |
import java.io.FilterInputStream; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.io.InterruptedIOException; | |
import java.net.SocketException; | |
import java.net.SocketTimeoutException; | |
/** | |
* InputStream wrapper that mimics the behavior of a socket input stream. The | |
* read methods have a timeout. The {@link #close()} call returns instantly and | |
* cannot throw. | |
* <p> | |
* The actual read is performed in a dedicated thread. The thread lives until | |
* {@link #close()} is called and the current read operation completes. | |
* <p> | |
* The main purpose of the class is to wrap pipes. The thread MAY hang during | |
* read, if nothing is written to the other side of the pipe. Then again the | |
* thread is merely a dependency of the resource on the other side of the pipe. | |
* If that resource leaks, the thread will stay. | |
* <p> | |
* Data is not lost because of read timeouts and the last operation can be | |
* repeated. A side effect is that a single {@link #skip(long)} call cannot skip | |
* more bytes than we can save in the buffer. | |
* <p> | |
* Methods may throw the usual SocketInputStream exceptions plus | |
* {@link InterruptedException} in case of interrupted wait. In that case, the | |
* stream is not closed and the interrupted status of the thread is set. | |
*/ | |
public class InterruptibleInputStream extends FilterInputStream { | |
public InterruptibleInputStream(InputStream in) { | |
super(in); | |
} | |
public void setOpTimeout(long millis) { | |
this.timeoutNanos = millis * MILLION; | |
} | |
/** | |
* If false, wait operations will throw {@link InterruptedIOException}. If | |
* true, to behave more like a socket, wait operations will retry after | |
* {@link InterruptedException}. In the end, the interrupted status of the | |
* thread will be restored. Default is true. | |
*/ | |
public void setIgnoreInterrupts(boolean b) { | |
this.ignoreInterrupts = b; | |
} | |
private long timeoutNanos; | |
private boolean ignoreInterrupts = true; | |
enum StreamOp { | |
READ, AVAIL | |
} | |
private final Object lock = new Object(); | |
private Thread waiter; | |
private Thread worker; | |
private boolean opCompleted; | |
private StreamOp currentOp; | |
private boolean closed; | |
private byte[] currentBuf = new byte[1024]; | |
private int currentOfs, currentLen; | |
private int resInt; | |
private Throwable resEx; | |
// final value assigned to currentOp on return from requestOp() | |
private StreamOp finalOpValue; | |
@Override | |
public int read() throws IOException { | |
byte[] buf = new byte[1]; | |
return read(buf, 0, 1) == -1 ? -1 : buf[0]; | |
} | |
@Override | |
public int read(byte[] b) throws IOException { | |
return read(b, 0, b.length); | |
} | |
@Override | |
public int read(byte[] b, int off, int len) throws IOException { | |
if (b == null) { | |
throw new NullPointerException(); | |
} | |
return checkAndRequestOp(READ, b, off, len); | |
} | |
@Override | |
public long skip(long n) throws IOException { | |
if (n <= 0) { | |
return 0; | |
} | |
int skipN = (int) Math.min(currentBuf.length, n); | |
int res = checkAndRequestOp(READ, null, 0, skipN); | |
return res == -1 ? 0 : res; | |
} | |
@Override | |
public int available() throws IOException { | |
return checkAndRequestOp(AVAIL, null, 0, 0); | |
} | |
@Override | |
public void close() throws IOException { | |
synchronized (lock) { | |
if (!closed) { | |
closed = true; | |
lock.notifyAll(); | |
} | |
} | |
} | |
@Override | |
public void mark(int readlimit) { | |
// | |
} | |
@Override | |
public void reset() throws IOException { | |
// | |
} | |
@Override | |
public boolean markSupported() { | |
return false; | |
} | |
private int checkAndRequestOp(StreamOp op, byte[] buf, int off, int len) | |
throws IOException { | |
long deadline = System.nanoTime() | |
+ (timeoutNanos > 0 ? timeoutNanos : Long.MAX_VALUE); | |
synchronized (lock) { | |
checkClosed(); | |
ensureThread(); | |
boolean[] interrupted = new boolean[1]; | |
try { | |
while (waiter != null && waiter.isAlive()) { | |
waitOrThrow(deadline, interrupted); | |
} | |
waiter = Thread.currentThread(); | |
try { | |
return requestOp(op, buf, off, len, deadline, interrupted); | |
} finally { | |
currentOp = finalOpValue; | |
waiter = null; | |
lock.notifyAll(); | |
} | |
} finally { | |
if (interrupted[0]) { | |
Thread.currentThread().interrupt(); | |
} | |
} | |
} | |
} | |
private void checkClosed() throws IOException { | |
if (closed) { | |
throw new SocketException("Stream closed"); | |
} | |
} | |
private int requestOp(StreamOp op, byte[] buf, int off, int len, | |
long deadline, boolean[] interrupted) throws IOException { | |
finalOpValue = currentOp; | |
if (currentOp != null) { | |
if (op == AVAIL && currentOp == READ && !opCompleted) { | |
// available() should not block for long unless another thread | |
// is waiting for a read op | |
return 0; | |
} | |
// currentOp will not change during the following wait() | |
while (!opCompleted) { | |
waitOrThrow(deadline, interrupted); | |
} | |
// default is to reset finalOpValue unless there's | |
// unread data in buffer which can be read | |
finalOpValue = null; | |
if (currentOp == READ) { | |
switch (op) { | |
case AVAIL: | |
if (resEx != null || resInt <= 0) { | |
// try again: maybe EOF no more | |
break; | |
} | |
// currentOp should stay READ | |
finalOpValue = currentOp; | |
return resInt; | |
case READ: | |
if (resEx != null || resInt <= 0) { | |
// try again: maybe EOF no more | |
break; | |
} | |
int newLen = Math.min(resInt, len); | |
if (buf != null) { | |
System.arraycopy(currentBuf, currentOfs, buf, off, | |
newLen); | |
} | |
currentOfs += newLen; | |
resInt -= newLen; | |
if (resInt != 0) { | |
// something's left in buffer | |
// currentOp should stay READ | |
finalOpValue = currentOp; | |
} | |
return newLen; | |
} | |
} | |
} | |
resEx = null; | |
// buf is null when called from skip() | |
if (op == READ && currentBuf.length < len) { | |
currentBuf = new byte[len]; | |
} | |
currentOfs = 0; | |
currentLen = len; | |
finalOpValue = op; | |
currentOp = op; | |
opCompleted = false; | |
lock.notifyAll(); | |
do { | |
waitOrThrow(deadline, interrupted); | |
} while (!opCompleted); | |
finalOpValue = null; | |
if (resEx != null) { | |
if (resEx instanceof IOException) { | |
// IOException subclasses handled differently by caller. | |
// Cannot wrap with a generic IOException. | |
if (resEx.getCause() == null) { | |
// Can preserve original stacktrace in the cause. | |
Exception causeStackTrace = new Exception(resEx.getClass() | |
.getName() + ": " + resEx.getMessage()); | |
causeStackTrace.setStackTrace(resEx.getStackTrace()); | |
resEx.initCause(causeStackTrace); | |
} | |
// Re-thrown exception must have current thread's stacktrace. | |
resEx.fillInStackTrace(); | |
throw (IOException) resEx; | |
} | |
throw new RuntimeException(resEx); | |
} | |
if (buf != null && resInt != -1) { | |
System.arraycopy(currentBuf, 0, buf, off, resInt); | |
} | |
return resInt; | |
} | |
private void ensureThread() { | |
if (worker != null) { | |
return; | |
} | |
worker = new Thread() { | |
@Override | |
public void run() { | |
try { | |
runOps(); | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
}; | |
worker.setName("reader-" + worker.getId()); | |
worker.setDaemon(true); | |
worker.start(); | |
} | |
private void runOps() throws InterruptedException { | |
boolean alreadyThrowing = true; | |
try { | |
while (waitOp()) { | |
runOp(); | |
} | |
alreadyThrowing = false; | |
} finally { | |
synchronized (lock) { | |
closed = true; | |
lock.notifyAll(); | |
} | |
try { | |
in.close(); | |
} catch (Exception e) { | |
if (!alreadyThrowing) { | |
throw new RuntimeException(e); | |
} | |
} | |
} | |
} | |
private boolean waitOp() throws InterruptedException { | |
synchronized (lock) { | |
while (!closed) { | |
if (currentOp != null && !opCompleted) { | |
return true; | |
} | |
lock.wait(); | |
} | |
} | |
return false; | |
} | |
private void runOp() { | |
try { | |
switch (currentOp) { | |
case READ: | |
resInt = in.read(currentBuf, 0, currentLen); | |
break; | |
case AVAIL: | |
resInt = in.available(); | |
break; | |
} | |
} catch (Error e) { | |
resEx = e; | |
throw e; | |
} catch (Throwable e) { | |
resEx = e; | |
} finally { | |
synchronized (lock) { | |
opCompleted = true; | |
lock.notifyAll(); | |
} | |
} | |
} | |
private void waitOrThrow(long deadline, boolean[] interrupted) | |
throws IOException { | |
long delay = deadline - System.nanoTime(); | |
if (delay <= 0) { | |
throw new SocketTimeoutException("timeout"); | |
} | |
try { | |
lock.wait(delay / MILLION, (int) (delay % MILLION)); | |
} catch (InterruptedException e) { | |
interrupted[0] = true; | |
if (!ignoreInterrupts) { | |
throw new InterruptedIOException("interrupted"); | |
} | |
} | |
checkClosed(); | |
} | |
private static int MILLION = 1000000; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment