Skip to content

Instantly share code, notes, and snippets.

@benevans
Last active August 29, 2015 13:57
Show Gist options
  • Save benevans/9750764 to your computer and use it in GitHub Desktop.
Save benevans/9750764 to your computer and use it in GitHub Desktop.
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.LinkedList;
import java.util.Queue;
public class WriteBeforeConnect {
static Logger log = LoggerFactory.getLogger("main");
public static void main(String[] args) throws Exception {
Bootstrap b = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new StringEncoder())
.addLast(new QueueWritesBeforeConnect());
}
});
Channel ch = b.bind(0).sync().channel();
ch.connect(new InetSocketAddress("127.0.0.1", 9000));
ch.writeAndFlush("ABC");
ch.writeAndFlush("DEF");
ch.writeAndFlush("GHI");
ch.closeFuture().sync();
}
static class QueueWritesBeforeConnect extends ChannelDuplexHandler {
static final Logger logger = LoggerFactory.getLogger("queue");
private int count;
private Queue<Write> queuedWrites = new LinkedList<Write>();
static class Write {
private final Object msg;
private final ChannelPromise promise;
Write(Object msg, ChannelPromise promise) {
this.msg = msg;
this.promise = promise;
}
void run(ChannelHandlerContext ctx) { ctx.write(msg, promise); }
void fail() { promise.setFailure(new IOException()); }
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
logger.debug("[{}] write: {}, queue it", count++, msg);
queuedWrites.add(new Write(msg, promise));
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
logger.debug("[{}] flush, ignore", count++);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
logger.debug("[{}] channelRegistered", count++);
super.channelRegistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.debug("[{}] channel active, flush queue", count++);
Write op;
while ((op = queuedWrites.poll()) != null) op.run(ctx);
ctx.flush();
ctx.pipeline().remove(this);
super.channelActive(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
logger.debug("[{}] handler removed", count++);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
logger.debug("channel unregistered, fail queue");
Write op;
while ((op = queuedWrites.poll()) != null) op.fail();
super.channelUnregistered(ctx);
}
}
}
// Normal operation...
//
// 22:04:09.515 [nioEventLoopGroup-2-1] DEBUG queue - [0] channelRegistered
// 22:04:09.521 [nioEventLoopGroup-2-1] DEBUG queue - [1] write: ABC, queue it
// 22:04:09.522 [nioEventLoopGroup-2-1] DEBUG queue - [2] write: DEF, queue it
// 22:04:09.522 [nioEventLoopGroup-2-1] DEBUG queue - [3] write: GHI, queue it
// 22:04:09.522 [nioEventLoopGroup-2-1] DEBUG queue - [4] flush, ignore
// 22:04:09.522 [nioEventLoopGroup-2-1] DEBUG queue - [5] channel active, flush queue
// 22:04:09.532 [nioEventLoopGroup-2-1] DEBUG queue - [6] handler removed
// But occasionally this happens... handler gets called after it was removed.
//
// 22:02:04.850 [nioEventLoopGroup-2-1] DEBUG queue - [0] channelRegistered
// 22:02:04.854 [nioEventLoopGroup-2-1] DEBUG queue - [1] channel active, flush queue
// 22:02:04.854 [nioEventLoopGroup-2-1] DEBUG queue - [2] handler removed
// 22:02:04.854 [nioEventLoopGroup-2-1] DEBUG queue - [3] write: ABC, queue it
// 22:02:04.854 [nioEventLoopGroup-2-1] DEBUG queue - [4] write: DEF, queue it
// 22:02:04.855 [nioEventLoopGroup-2-1] DEBUG queue - [5] write: GHI, queue it
// 22:02:04.855 [nioEventLoopGroup-2-1] DEBUG queue - [6] flush, ignore
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment