Skip to content

Instantly share code, notes, and snippets.

@rhs
Created July 22, 2014 15:13
Show Gist options
  • Save rhs/0785aa88f2752c00183e to your computer and use it in GitHub Desktop.
Save rhs/0785aa88f2752c00183e to your computer and use it in GitHub Desktop.
private int offset = 0;
@Override
protected void onTransport(final Transport transport)
{
ByteBuf bytes = getPooledNettyBytes(transport);
// null means nothing to be written
if (bytes != null)
{
final int size = bytes.readableBytes();
offset += size;
connectionSPI.output(bytes, new ChannelFutureListener()
{
@Override
public void operationComplete(ChannelFuture future) throws Exception
{
synchronized (getLock())
{
offset -= size;
transport.pop(size);
}
}
});
}
}
/** return the current byte output */
private ByteBuf getPooledNettyBytes(Transport transport)
{
int pending = transport.pending();
if (pending < 0) {
return null;//throw new IllegalStateException("xxx need to close the connection");
}
int size = pending - offset;
if (size < 0) {
throw new IllegalStateException("negative size: " + pending + ", " + offset);
}
if (size == 0) {
return null;
}
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer(size);
ByteBuffer head = transport.head();
head.position(offset);
buffer.writeBytes(head);
return buffer;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment