-
-
Save ninja-/91a13b2630210891d429 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.netty.channel; | |
import io.netty.buffer.ByteBuf; | |
import io.netty.buffer.Unpooled; | |
import io.netty.channel.ChannelHandlerContext; | |
import io.netty.channel.ChannelOutboundHandlerAdapter; | |
import io.netty.channel.ChannelPromise; | |
import io.netty.handler.codec.ByteToMessageDecoder; | |
import lombok.Getter; | |
import lombok.NoArgsConstructor; | |
import lombok.Setter; | |
import java.lang.reflect.Field; | |
/** | |
* The purpose of this class is to help combat spam of the | |
* {@link io.netty.channel.ChannelOutboundBuffer.Entry} instances | |
* by cumulating every written buffer into one until the user decides to call {@link io.netty.channel.Channel#flush()}. | |
* | |
* To work properly, this handler needs to added at the "top" of your pipeline, | |
* so it would receive a fully encoded message. | |
* | |
* Please note that the writes can only be cumulated if you are writing them with a | |
* {@link Channel#voidPromise()}. | |
* If you are writing your data without voidPromise(), expect undefined behaviour! | |
* (I am mostly concerned, that the packets would get out of order. | |
* That's not something I care to implement, for such a use case) | |
* | |
* It is very effective in this task and when used properly | |
* ( that is, with auto read = false and a proper implementation of your own read() ) | |
* can reduce the number of Entries for example from 500000(!!!) to just 200...... | |
* | |
* You can specify a promise that will be called: | |
* - once a cummulation is flushed | |
* - once an _empty_ cummulation has been flushed | |
* | |
* The purpose of specifying a promise is for example - scheduling a read | |
* on the other side of the proxied connection, if you are developing a proxy. | |
* Note that the promise needs to be reusable. | |
*/ | |
@NoArgsConstructor | |
public class MessageSquasher extends ChannelDuplexHandler | |
{ | |
private ByteBuf cumulation; | |
private int writtenBuffers = 0; | |
private int writtenBytes = 0; | |
@Getter | |
@Setter | |
@NonNull | |
private ChannelPromise promise; | |
public static final ByteToMessageDecoder.Cumulator MERGE_CUMULATOR = (alloc, cumulation1, in) -> { | |
ByteBuf buffer; | |
if (cumulation1.writerIndex() > cumulation1.maxCapacity() - in.readableBytes() | |
) { | |
// Expand cumulation (by replace it) when either there is not more room in the buffer | |
// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or | |
// duplicate().retain(). | |
// | |
// See: | |
// - https://github.com/netty/netty/issues/2327 | |
// - https://github.com/netty/netty/issues/1764 | |
buffer = expandCumulation(alloc, cumulation1, in.readableBytes()); | |
} else { | |
buffer = cumulation1; | |
} | |
buffer.writeBytes(in); | |
in.release(); | |
return buffer; | |
}; | |
private static ByteToMessageDecoder.Cumulator CUMMULATOR = MERGE_CUMULATOR; | |
static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) { | |
ByteBuf oldCumulation = cumulation; | |
cumulation = alloc.buffer(oldCumulation.readableBytes() + readable); | |
cumulation.writeBytes(oldCumulation); | |
oldCumulation.release(); | |
return cumulation; | |
} | |
@Override | |
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { | |
if (! (msg instanceof ByteBuf) || !promise.isVoid()) { | |
ctx.write(msg, promise); | |
return; | |
} | |
ByteBuf buf = (ByteBuf) msg; | |
writtenBytes += buf.readableBytes(); | |
if (cumulation == null) | |
{ | |
cumulation = buf; | |
} | |
else | |
{ | |
cumulation = CUMMULATOR.cumulate(ctx.alloc(), cumulation, buf); | |
// the cumulator is expected to seriously release the "buf" | |
} | |
++writtenBuffers; | |
} | |
@Override | |
public void flush(ChannelHandlerContext ctx) throws Exception { | |
if (cumulation != null) { | |
ChannelOutboundBuffer c = ctx.channel().unsafe().outboundBuffer(); | |
boolean fast = false; | |
// TODO: the code below is at least suspicious | |
// I think it was causing random stream corruption :C it would be nice to know why??? | |
/* if (c != null && c.current() != null && c.current() instanceof ByteBuf) | |
{ | |
// fast-path | |
ByteBuf entry = ((ByteBuf) c.current()); | |
if (entry.readableBytes() > 0 && | |
entry.isWritable(cumulation.readableBytes())) | |
{ | |
entry.writeBytes(cumulation); | |
c.incrementPendingOutboundBytes(cumulation.readableBytes()); | |
cumulation.release(); | |
cumulation = null; | |
// it HAS to be an entry made be us. Don't break it!! | |
fast = true; | |
} | |
} */ | |
Object _cumulation = cumulation; | |
cumulation = null; | |
writtenBuffers = writtenBytes = 0; | |
if (fast) return; | |
ctx.write(_cumulation, promise == null ? ctx.voidPromise() : promise); | |
ctx.flush(); | |
return; | |
} | |
else { | |
if (promise != null) | |
ctx.write(Unpooled.EMPTY_BUFFER, promise); | |
} | |
ctx.flush(); | |
} | |
@Override | |
public void channelInactive(ChannelHandlerContext ctx) throws Exception { | |
if (cumulation != null) | |
{ | |
cumulation.release(); | |
cumulation = null; | |
} | |
ctx.fireChannelInactive(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@Scottmitch any idea why the now commented out fast-path would trigger stream corruption randomly? I removed it but I can't understand why it would do such a thing. Unfortunately I was testing with the "reset indexes fix" netty commit already so that wasn't the case...