Skip to content

Instantly share code, notes, and snippets.

@ninja-
Last active September 26, 2015 06:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ninja-/91a13b2630210891d429 to your computer and use it in GitHub Desktop.
Save ninja-/91a13b2630210891d429 to your computer and use it in GitHub Desktop.
package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import java.lang.reflect.Field;
/**
* The purpose of this class is to help combat spam of the
* {@link io.netty.channel.ChannelOutboundBuffer.Entry} instances
* by cumulating every written buffer into one until the user decides to call {@link io.netty.channel.Channel#flush()}.
*
* To work properly, this handler needs to added at the "top" of your pipeline,
* so it would receive a fully encoded message.
*
* Please note that the writes can only be cumulated if you are writing them with a
* {@link Channel#voidPromise()}.
* If you are writing your data without voidPromise(), expect undefined behaviour!
* (I am mostly concerned, that the packets would get out of order.
* That's not something I care to implement, for such a use case)
*
* It is very effective in this task and when used properly
* ( that is, with auto read = false and a proper implementation of your own read() )
* can reduce the number of Entries for example from 500000(!!!) to just 200......
*
* You can specify a promise that will be called:
* - once a cummulation is flushed
* - once an _empty_ cummulation has been flushed
*
* The purpose of specifying a promise is for example - scheduling a read
* on the other side of the proxied connection, if you are developing a proxy.
* Note that the promise needs to be reusable.
*/
@NoArgsConstructor
public class MessageSquasher extends ChannelDuplexHandler
{
private ByteBuf cumulation;
private int writtenBuffers = 0;
private int writtenBytes = 0;
@Getter
@Setter
@NonNull
private ChannelPromise promise;
public static final ByteToMessageDecoder.Cumulator MERGE_CUMULATOR = (alloc, cumulation1, in) -> {
ByteBuf buffer;
if (cumulation1.writerIndex() > cumulation1.maxCapacity() - in.readableBytes()
) {
// Expand cumulation (by replace it) when either there is not more room in the buffer
// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
// duplicate().retain().
//
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
buffer = expandCumulation(alloc, cumulation1, in.readableBytes());
} else {
buffer = cumulation1;
}
buffer.writeBytes(in);
in.release();
return buffer;
};
private static ByteToMessageDecoder.Cumulator CUMMULATOR = MERGE_CUMULATOR;
static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
ByteBuf oldCumulation = cumulation;
cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
cumulation.writeBytes(oldCumulation);
oldCumulation.release();
return cumulation;
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (! (msg instanceof ByteBuf) || !promise.isVoid()) {
ctx.write(msg, promise);
return;
}
ByteBuf buf = (ByteBuf) msg;
writtenBytes += buf.readableBytes();
if (cumulation == null)
{
cumulation = buf;
}
else
{
cumulation = CUMMULATOR.cumulate(ctx.alloc(), cumulation, buf);
// the cumulator is expected to seriously release the "buf"
}
++writtenBuffers;
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
if (cumulation != null) {
ChannelOutboundBuffer c = ctx.channel().unsafe().outboundBuffer();
boolean fast = false;
// TODO: the code below is at least suspicious
// I think it was causing random stream corruption :C it would be nice to know why???
/* if (c != null && c.current() != null && c.current() instanceof ByteBuf)
{
// fast-path
ByteBuf entry = ((ByteBuf) c.current());
if (entry.readableBytes() > 0 &&
entry.isWritable(cumulation.readableBytes()))
{
entry.writeBytes(cumulation);
c.incrementPendingOutboundBytes(cumulation.readableBytes());
cumulation.release();
cumulation = null;
// it HAS to be an entry made be us. Don't break it!!
fast = true;
}
} */
Object _cumulation = cumulation;
cumulation = null;
writtenBuffers = writtenBytes = 0;
if (fast) return;
ctx.write(_cumulation, promise == null ? ctx.voidPromise() : promise);
ctx.flush();
return;
}
else {
if (promise != null)
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
ctx.flush();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (cumulation != null)
{
cumulation.release();
cumulation = null;
}
ctx.fireChannelInactive();
}
}
@ninja-
Copy link
Author

ninja- commented Sep 26, 2015

@Scottmitch any idea why the now commented out fast-path would trigger stream corruption randomly? I removed it but I can't understand why it would do such a thing. Unfortunately I was testing with the "reset indexes fix" netty commit already so that wasn't the case...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment