Skip to content

Instantly share code, notes, and snippets.

@mhgrove
Created September 6, 2013 14:11
Show Gist options
  • Save mhgrove/6464352 to your computer and use it in GitHub Desktop.
Save mhgrove/6464352 to your computer and use it in GitHub Desktop.
ByteBufHttpOutputStream
public static final class ByteBufHttpOutputStream extends OutputStream {
/**
* The channel to which the {@link ByteBuf} objects are written
*/
private final ChannelHandlerContext mChannelHandlerContext;
/**
* The {@link ByteBuf} to which data is currently being written
*/
private ByteBuf mByteBuf;
/**
* Local byte array to use for writing a single byte
*/
private final byte[] mBuffer = new byte[1];
/**
* The maximum size that the internal {@link ByteBuf} can reach before being sent out over the channel
*/
private final static int MAX_SIZE = 1048576;
private boolean mClosed = false;
private ByteBufHttpOutputStream(final ChannelHandlerContext theChannelHandlerContext) {
mChannelHandlerContext = theChannelHandlerContext;
mByteBuf = theChannelHandlerContext.alloc().buffer();
}
/**
* Create a new ByteBufOutputStream that wraps the given {@link ChannelHandlerContext}
* @param theChannelHandlerContext The context to which the data from the stream are to be written
* @return a new ByteBufOutputStream
*/
public static ByteBufHttpOutputStream create(final ChannelHandlerContext theChannelHandlerContext) {
return new ByteBufHttpOutputStream(theChannelHandlerContext);
}
/**
* @inheritDoc
*/
@Override
public void close() throws IOException {
if (mClosed) {
return;
}
sendLastByteBuf();
mClosed = true;
mByteBuf.release();
}
/**
* @inheritDoc
*/
@Override
public void flush() throws IOException {
if (mClosed) {
throw new IOException("Stream is closed");
}
sendByteBuf();
}
/**
* @inheritDoc
*/
@Override
public void write(byte[] theBytes) throws IOException {
if (mClosed) {
throw new IOException("Stream is closed");
}
write(theBytes, 0, theBytes.length);
}
/**
* @inheritDoc
*/
@Override
public void write(byte[] theBytes, int theOffset, int theLength) throws IOException {
if (mClosed) {
throw new IOException("Stream is closed");
}
if (theOffset < 0 || theLength < 0 || theLength > theBytes.length) {
throw new IndexOutOfBoundsException("Invalid length/offset");
}
if (mByteBuf.readableBytes() + theLength <= MAX_SIZE) {
mByteBuf.writeBytes(theBytes, theOffset, theLength);
}
else {
if (mByteBuf.readableBytes() >= MAX_SIZE) {
sendByteBuf();
}
int aCount = 0;
int aOffset = theOffset;
while (aCount < theLength) {
int aLengthToAppend = Math.min(theLength, MAX_SIZE - mByteBuf.readableBytes());
mByteBuf.writeBytes(theBytes, aOffset, aLengthToAppend);
aCount += aLengthToAppend;
aOffset += aLengthToAppend;
if (mByteBuf.readableBytes() >= MAX_SIZE) {
sendByteBuf();
}
}
}
}
/**
* @inheritDoc
*/
@Override
public void write(int theByte) throws IOException {
if (mClosed) {
throw new IOException("Stream is closed");
}
mBuffer[0] = (byte)theByte;
write(mBuffer, 0, 1);
}
private void sendByteBuf() {
sendByteBuf(false);
}
private void sendLastByteBuf() {
sendByteBuf(true);
}
private void sendByteBuf(boolean isFinal) {
if (mByteBuf.readableBytes() > 0) {
mChannelHandlerContext.write(new DefaultHttpContent(mByteBuf.copy()));
mByteBuf.clear();
}
if (isFinal) {
mChannelHandlerContext.write(LastHttpContent.EMPTY_LAST_CONTENT);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment