Skip to content

Instantly share code, notes, and snippets.

@dflemstr
Created August 17, 2014 21:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dflemstr/9541193d141729b6c84d to your computer and use it in GitHub Desktop.
Save dflemstr/9541193d141729b6c84d to your computer and use it in GitHub Desktop.
package name.dflemstr.vessel.docker;
import com.google.common.collect.Queues;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closer;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
/**
* A highly optimized de-multiplexing stream implementation that parses a Docker log stream and
* splits it into multiple streams.
*/
public class DockerLogStream implements Closeable {
private final InputStream stdin;
private final InputStream stdout;
private final InputStream stderr;
private final Closer closer;
private DockerLogStream(InputStream stdin, InputStream stdout, InputStream stderr,
Closer closer) {
this.stdin = stdin;
this.stdout = stdout;
this.stderr = stderr;
this.closer = closer;
}
/**
* Consumes the specified inputStream, closing it when the {@link name.dflemstr.vessel.docker.DockerLogStream}
* is closed.
*
* @param inputStream The input stream in Docker log stream format to consume.
* @return A {@link name.dflemstr.vessel.docker.DockerLogStream} that must be closed after it is
* used.
*/
public static DockerLogStream consume(InputStream inputStream) {
// Indicator for if each stream is closed
AtomicBoolean stdinClosed = new AtomicBoolean(false);
AtomicBoolean stdoutClosed = new AtomicBoolean(false);
AtomicBoolean stderrClosed = new AtomicBoolean(false);
// Number of still open streams
AtomicInteger numConsumers = new AtomicInteger(3);
// The pending frames for each stream
Queue<ByteBuffer> stdinQueue = Queues.newConcurrentLinkedQueue();
Queue<ByteBuffer> stdoutQueue = Queues.newConcurrentLinkedQueue();
Queue<ByteBuffer> stderrQueue = Queues.newConcurrentLinkedQueue();
// Lock that must be held while reading from inputStream
ReentrantLock readLock = new ReentrantLock();
InputStream stdin =
new SpecializedLogInputStream(
readLock, inputStream,
stdinQueue,
stdinQueue, stdoutQueue, stderrQueue,
stdinClosed, numConsumers);
InputStream stdout =
new SpecializedLogInputStream(
readLock, inputStream,
stdoutQueue,
stdinQueue, stdoutQueue, stderrQueue,
stdoutClosed, numConsumers);
InputStream stderr =
new SpecializedLogInputStream(
readLock, inputStream,
stderrQueue,
stdinQueue, stdoutQueue, stderrQueue,
stderrClosed, numConsumers);
Closer closer = Closer.create();
closer.register(stdin);
closer.register(stdout);
closer.register(stderr);
return new DockerLogStream(stdin, stdout, stderr, closer);
}
protected static boolean readFrame(
InputStream inputStream,
Queue<ByteBuffer> stdin,
Queue<ByteBuffer> stdout,
Queue<ByteBuffer> stderr) throws IOException {
ByteBuffer headerBuffer = ByteBuffer.allocate(8);
headerBuffer.order(ByteOrder.BIG_ENDIAN);
try {
ByteStreams.readFully(inputStream, headerBuffer.array());
} catch (EOFException e) {
return false;
}
int type = headerBuffer.get();
// Skip 3 bytes
headerBuffer.get();
headerBuffer.get();
headerBuffer.get();
int size = headerBuffer.getInt();
ByteBuffer payload = ByteBuffer.allocate(size);
try {
ByteStreams.readFully(inputStream, payload.array());
} catch (EOFException e) {
return false;
}
switch (type) {
case 0:
stdin.add(payload);
break;
case 1:
stdout.add(payload);
break;
case 2:
stderr.add(payload);
break;
// default: ignore unknown header tag
}
return true;
}
public InputStream getStdin() {
return stdin;
}
public InputStream getStdout() {
return stdout;
}
public InputStream getStderr() {
return stderr;
}
@Override
public void close() throws IOException {
closer.close();
}
protected static class SpecializedLogInputStream extends InputStream {
private final ReentrantLock readLock;
private final InputStream inputStream;
private final Queue<ByteBuffer> myQueue;
private final Queue<ByteBuffer> stdinQueue;
private final Queue<ByteBuffer> stdoutQueue;
private final Queue<ByteBuffer> stderrQueue;
private final AtomicBoolean myClosed;
private final AtomicInteger numConsumers;
public SpecializedLogInputStream(
ReentrantLock readLock, InputStream inputStream,
Queue<ByteBuffer> myQueue,
Queue<ByteBuffer> stdinQueue, Queue<ByteBuffer> stdoutQueue, Queue<ByteBuffer> stderrQueue,
AtomicBoolean myClosed,
AtomicInteger numConsumers) {
this.readLock = readLock;
this.inputStream = inputStream;
this.myQueue = myQueue;
this.stdinQueue = stdinQueue;
this.stdoutQueue = stdoutQueue;
this.stderrQueue = stderrQueue;
this.myClosed = myClosed;
this.numConsumers = numConsumers;
}
// TODO: support bulk read, not just read()
@Override
public int available() throws IOException {
ByteBuffer currentFrame = myQueue.peek();
return (currentFrame == null) ? 0 : currentFrame.remaining();
}
@Override
public synchronized int read() throws IOException {
while (true) {
ByteBuffer currentFrame = myQueue.peek();
if (currentFrame != null) {
if (currentFrame.hasRemaining()) {
return currentFrame.get();
} else {
myQueue.remove(currentFrame);
}
}
readLock.lock();
try {
if (myQueue.isEmpty()) {
if (!readFrame(inputStream, stdinQueue, stdoutQueue, stderrQueue)) {
return -1;
}
}
} finally {
readLock.unlock();
}
}
}
@Override
public void close() throws IOException {
if (myClosed.compareAndSet(false, true)) {
if (numConsumers.decrementAndGet() == 0) {
inputStream.close();
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment