Last active
May 3, 2017 08:01
-
-
Save jadbaz/47d98da0ead2e71659f343b14ef05de6 to your computer and use it in GitHub Desktop.
A Netty 4 ChannelOutboundHandler that implements a queue system to optimize usage and throughput
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.netty.channel.*; | |
import java.util.Queue; | |
import java.util.concurrent.ConcurrentLinkedQueue; | |
public class WriteQueueOutboundChannelHandler extends ChannelOutboundHandlerAdapter { | |
private static final int QUEUE_SIZE_WARNING = 5000; | |
private ChannelHandlerContext ctx; | |
private final Queue<Object> messageQueue = new ConcurrentLinkedQueue<>(); | |
private int qSize = 0; //should make access synchronized | |
private boolean isWriting; | |
private final ChannelFutureListener sendListener = new ChannelFutureListener() { | |
@Override | |
public void operationComplete(ChannelFuture future) { | |
if (future.isSuccess()) { | |
ctx.fireUserEventTriggered("WRITE_MESSAGE_COMPLETE"); //might want to do a message counter in another handler | |
poll(); | |
} else { | |
future.channel().close(); | |
messageQueue.clear(); | |
} | |
} | |
}; | |
private void poll() { | |
isWriting = true; | |
if (!messageQueue.isEmpty()) { | |
this.ctx.writeAndFlush(messageQueue.poll()).addListener(sendListener); | |
qSize--; | |
} | |
else { | |
isWriting = false; | |
} | |
} | |
@Override | |
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { | |
this.ctx = ctx; | |
int size = qSize; | |
if (size > QUEUE_SIZE_WARNING) { | |
System.out.println("Queue size: "+size); //should handle somehow | |
} | |
messageQueue.offer(msg); | |
qSize++; | |
if (!isWriting) { | |
poll(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This class seeks to address Netty 4 issue 1759.