Skip to content

Instantly share code, notes, and snippets.

@jadbaz
Last active May 3, 2017 08:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jadbaz/47d98da0ead2e71659f343b14ef05de6 to your computer and use it in GitHub Desktop.
Save jadbaz/47d98da0ead2e71659f343b14ef05de6 to your computer and use it in GitHub Desktop.
A Netty 4 ChannelOutboundHandler that implements a queue system to optimize usage and throughput
import io.netty.channel.*;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
public class WriteQueueOutboundChannelHandler extends ChannelOutboundHandlerAdapter {
private static final int QUEUE_SIZE_WARNING = 5000;
private ChannelHandlerContext ctx;
private final Queue<Object> messageQueue = new ConcurrentLinkedQueue<>();
private int qSize = 0; //should make access synchronized
private boolean isWriting;
private final ChannelFutureListener sendListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
ctx.fireUserEventTriggered("WRITE_MESSAGE_COMPLETE"); //might want to do a message counter in another handler
poll();
} else {
future.channel().close();
messageQueue.clear();
}
}
};
private void poll() {
isWriting = true;
if (!messageQueue.isEmpty()) {
this.ctx.writeAndFlush(messageQueue.poll()).addListener(sendListener);
qSize--;
}
else {
isWriting = false;
}
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
this.ctx = ctx;
int size = qSize;
if (size > QUEUE_SIZE_WARNING) {
System.out.println("Queue size: "+size); //should handle somehow
}
messageQueue.offer(msg);
qSize++;
if (!isWriting) {
poll();
}
}
}
@jadbaz
Copy link
Author

jadbaz commented May 2, 2017

This class seeks to address Netty 4 issue 1759.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment