Created
August 17, 2014 21:26
-
-
Save dflemstr/9541193d141729b6c84d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package name.dflemstr.vessel.docker; | |
import com.google.common.collect.Queues; | |
import com.google.common.io.ByteStreams; | |
import com.google.common.io.Closer; | |
import java.io.Closeable; | |
import java.io.EOFException; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.nio.ByteBuffer; | |
import java.nio.ByteOrder; | |
import java.util.Queue; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.locks.ReentrantLock; | |
/** | |
* A highly optimized de-multiplexing stream implementation that parses a Docker log stream and | |
* splits it into multiple streams. | |
*/ | |
public class DockerLogStream implements Closeable { | |
private final InputStream stdin; | |
private final InputStream stdout; | |
private final InputStream stderr; | |
private final Closer closer; | |
private DockerLogStream(InputStream stdin, InputStream stdout, InputStream stderr, | |
Closer closer) { | |
this.stdin = stdin; | |
this.stdout = stdout; | |
this.stderr = stderr; | |
this.closer = closer; | |
} | |
/** | |
* Consumes the specified inputStream, closing it when the {@link name.dflemstr.vessel.docker.DockerLogStream} | |
* is closed. | |
* | |
* @param inputStream The input stream in Docker log stream format to consume. | |
* @return A {@link name.dflemstr.vessel.docker.DockerLogStream} that must be closed after it is | |
* used. | |
*/ | |
public static DockerLogStream consume(InputStream inputStream) { | |
// Indicator for if each stream is closed | |
AtomicBoolean stdinClosed = new AtomicBoolean(false); | |
AtomicBoolean stdoutClosed = new AtomicBoolean(false); | |
AtomicBoolean stderrClosed = new AtomicBoolean(false); | |
// Number of still open streams | |
AtomicInteger numConsumers = new AtomicInteger(3); | |
// The pending frames for each stream | |
Queue<ByteBuffer> stdinQueue = Queues.newConcurrentLinkedQueue(); | |
Queue<ByteBuffer> stdoutQueue = Queues.newConcurrentLinkedQueue(); | |
Queue<ByteBuffer> stderrQueue = Queues.newConcurrentLinkedQueue(); | |
// Lock that must be held while reading from inputStream | |
ReentrantLock readLock = new ReentrantLock(); | |
InputStream stdin = | |
new SpecializedLogInputStream( | |
readLock, inputStream, | |
stdinQueue, | |
stdinQueue, stdoutQueue, stderrQueue, | |
stdinClosed, numConsumers); | |
InputStream stdout = | |
new SpecializedLogInputStream( | |
readLock, inputStream, | |
stdoutQueue, | |
stdinQueue, stdoutQueue, stderrQueue, | |
stdoutClosed, numConsumers); | |
InputStream stderr = | |
new SpecializedLogInputStream( | |
readLock, inputStream, | |
stderrQueue, | |
stdinQueue, stdoutQueue, stderrQueue, | |
stderrClosed, numConsumers); | |
Closer closer = Closer.create(); | |
closer.register(stdin); | |
closer.register(stdout); | |
closer.register(stderr); | |
return new DockerLogStream(stdin, stdout, stderr, closer); | |
} | |
protected static boolean readFrame( | |
InputStream inputStream, | |
Queue<ByteBuffer> stdin, | |
Queue<ByteBuffer> stdout, | |
Queue<ByteBuffer> stderr) throws IOException { | |
ByteBuffer headerBuffer = ByteBuffer.allocate(8); | |
headerBuffer.order(ByteOrder.BIG_ENDIAN); | |
try { | |
ByteStreams.readFully(inputStream, headerBuffer.array()); | |
} catch (EOFException e) { | |
return false; | |
} | |
int type = headerBuffer.get(); | |
// Skip 3 bytes | |
headerBuffer.get(); | |
headerBuffer.get(); | |
headerBuffer.get(); | |
int size = headerBuffer.getInt(); | |
ByteBuffer payload = ByteBuffer.allocate(size); | |
try { | |
ByteStreams.readFully(inputStream, payload.array()); | |
} catch (EOFException e) { | |
return false; | |
} | |
switch (type) { | |
case 0: | |
stdin.add(payload); | |
break; | |
case 1: | |
stdout.add(payload); | |
break; | |
case 2: | |
stderr.add(payload); | |
break; | |
// default: ignore unknown header tag | |
} | |
return true; | |
} | |
public InputStream getStdin() { | |
return stdin; | |
} | |
public InputStream getStdout() { | |
return stdout; | |
} | |
public InputStream getStderr() { | |
return stderr; | |
} | |
@Override | |
public void close() throws IOException { | |
closer.close(); | |
} | |
protected static class SpecializedLogInputStream extends InputStream { | |
private final ReentrantLock readLock; | |
private final InputStream inputStream; | |
private final Queue<ByteBuffer> myQueue; | |
private final Queue<ByteBuffer> stdinQueue; | |
private final Queue<ByteBuffer> stdoutQueue; | |
private final Queue<ByteBuffer> stderrQueue; | |
private final AtomicBoolean myClosed; | |
private final AtomicInteger numConsumers; | |
public SpecializedLogInputStream( | |
ReentrantLock readLock, InputStream inputStream, | |
Queue<ByteBuffer> myQueue, | |
Queue<ByteBuffer> stdinQueue, Queue<ByteBuffer> stdoutQueue, Queue<ByteBuffer> stderrQueue, | |
AtomicBoolean myClosed, | |
AtomicInteger numConsumers) { | |
this.readLock = readLock; | |
this.inputStream = inputStream; | |
this.myQueue = myQueue; | |
this.stdinQueue = stdinQueue; | |
this.stdoutQueue = stdoutQueue; | |
this.stderrQueue = stderrQueue; | |
this.myClosed = myClosed; | |
this.numConsumers = numConsumers; | |
} | |
// TODO: support bulk read, not just read() | |
@Override | |
public int available() throws IOException { | |
ByteBuffer currentFrame = myQueue.peek(); | |
return (currentFrame == null) ? 0 : currentFrame.remaining(); | |
} | |
@Override | |
public synchronized int read() throws IOException { | |
while (true) { | |
ByteBuffer currentFrame = myQueue.peek(); | |
if (currentFrame != null) { | |
if (currentFrame.hasRemaining()) { | |
return currentFrame.get(); | |
} else { | |
myQueue.remove(currentFrame); | |
} | |
} | |
readLock.lock(); | |
try { | |
if (myQueue.isEmpty()) { | |
if (!readFrame(inputStream, stdinQueue, stdoutQueue, stderrQueue)) { | |
return -1; | |
} | |
} | |
} finally { | |
readLock.unlock(); | |
} | |
} | |
} | |
@Override | |
public void close() throws IOException { | |
if (myClosed.compareAndSet(false, true)) { | |
if (numConsumers.decrementAndGet() == 0) { | |
inputStream.close(); | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment