Skip to content

Instantly share code, notes, and snippets.

@thiagoh
Last active August 29, 2015 14:27
Show Gist options
  • Save thiagoh/ab54ca1049760cd7fe82 to your computer and use it in GitHub Desktop.
Save thiagoh/ab54ca1049760cd7fe82 to your computer and use it in GitHub Desktop.
Input stream with multiple sources
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class ExtendedInputStream extends InputStream {
private static final Log _log = LogFactory.getLog(ExtendedInputStream.class.getName());
private final BlockingQueue<InputStream> queue;
private final Thread producerThread;
private final InputStreamProducer producer;
private final ExtendedInputStreamProdutor runnable;
private IOException _error;
private boolean _producerFinished = false;
private boolean _startedReading = false;
private InputStream currentInputStream;
private static final ThreadFactory _namedThreadFactory = new NamedThreadFactory();
private static final int INITIAL_CAPACITY = 10;
public ExtendedInputStream(InputStreamProducer producer) {
this.producer = producer;
this.runnable = new ExtendedInputStreamProdutor();
this.queue = new ArrayBlockingQueue<InputStream>(INITIAL_CAPACITY, true);
this.producerThread = _namedThreadFactory.newThread(runnable);
}
private static class NamedThreadFactory implements ThreadFactory {
private static AtomicInteger atomicInteger = new AtomicInteger(0);
@Override
public Thread newThread(Runnable runnable) {
ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
Thread thread = new Thread(threadGroup, runnable,
ExtendedInputStream.class.getName() + "_" + atomicInteger.getAndIncrement());
thread.setDaemon(true);
return thread;
}
}
private void start() throws IOException {
_startedReading = true;
producerThread.start();
fillNextInputStream();
}
private boolean _checkError() throws IOException {
if (_error != null) {
throw _error;
}
return true;
}
private synchronized boolean fillNextInputStream() throws IOException {
try {
// If is finished stop consuming from queue
if (queue.isEmpty() && _producerFinished) {
return false;
}
// If there is an error stop consuming from queue
if (_error != null) {
currentInputStream = null;
return false;
}
InputStream is = null;
while (_checkError() && (is == null) && (!queue.isEmpty() || !_producerFinished)) {
is = queue.poll(10, TimeUnit.MILLISECONDS);
}
currentInputStream = is;
return is != null;
} catch (InterruptedException ie) {
_error = new IOException(ie);
try {
this.producerThread.interrupt();
} catch (Exception e2) {
if (_log.isDebugEnabled()) {
_log.debug(e2);
}
}
throw _error;
}
}
@Override
public long skip(long n) throws IOException {
return currentInputStream.skip(n);
}
@Override
public synchronized void mark(int readlimit) {
currentInputStream.mark(readlimit);
}
@Override
public synchronized void reset() throws IOException {
throw new IOException("mark/reset not supported");
}
@Override
public boolean markSupported() {
return currentInputStream.markSupported();
}
@Override
public int available() throws IOException {
return currentInputStream.available();
}
@Override
public int read(byte[] b) throws IOException {
return read(b, 0, b.length);
}
@Override
public synchronized int read() throws IOException {
if (!_startedReading) {
start();
}
if (currentInputStream == null) {
return -1;
}
_checkError();
int byteRead = currentInputStream.read();
if (byteRead == -1) {
if (!fillNextInputStream() && _checkError()) {
return -1;
}
byteRead = currentInputStream.read();
}
return byteRead;
}
@Override
public synchronized int read(byte b[], int off, int len) throws IOException {
if (!_startedReading) {
start();
}
if (currentInputStream == null) {
return -1;
}
_checkError();
int count = currentInputStream.read(b, off, len);
if (count == -1) {
if (!fillNextInputStream() && _checkError()) {
return -1;
}
count = currentInputStream.read(b, off, len);
}
return count;
}
@Override
public void close() throws IOException {
super.close();
try {
this.producerThread.interrupt();
} catch (Exception e) {
if (_log.isDebugEnabled()) {
_log.debug(e);
}
}
}
private void _error(IOException e) {
_error = e;
try {
this.producerThread.interrupt();
} catch (Exception e2) {
if (_log.isDebugEnabled()) {
_log.debug(e2);
}
}
}
public void finished() {
_producerFinished = true;
}
public InputStream getNextInputStream() {
return currentInputStream;
}
public BlockingQueue<InputStream> getQueue() {
return queue;
}
private class ExtendedInputStreamProdutor implements Runnable {
@Override
public void run() {
if (_log.isDebugEnabled()) {
_log.debug("InputStream producer has been started");
}
try {
while (producer.hasNext()) {
InputStream is = null;
try {
is = producer.produce();
} catch (IOException e) {
_log.warn(e);
}
if (is != null) {
while (!queue.offer(is, 10, TimeUnit.MILLISECONDS)) {
// wait for the offer be accepted
}
}
}
// to finish the stream
_producerFinished = true;
} catch (Exception e) {
_log.warn(e);
_error(new IOException(e));
} finally {
try {
close();
} catch (IOException e) {
_log.warn(e);
}
}
}
}
}
import java.io.IOException;
import java.io.InputStream;
public interface InputStreamProducer {
public InputStream produce() throws IOException;
public boolean hasNext();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment