Last active
August 29, 2015 13:57
-
-
Save benevans/9750764 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.netty.bootstrap.Bootstrap; | |
import io.netty.channel.*; | |
import io.netty.channel.nio.NioEventLoopGroup; | |
import io.netty.channel.socket.SocketChannel; | |
import io.netty.channel.socket.nio.NioSocketChannel; | |
import io.netty.handler.codec.string.StringEncoder; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.IOException; | |
import java.net.InetSocketAddress; | |
import java.util.LinkedList; | |
import java.util.Queue; | |
public class WriteBeforeConnect { | |
static Logger log = LoggerFactory.getLogger("main"); | |
public static void main(String[] args) throws Exception { | |
Bootstrap b = new Bootstrap() | |
.group(new NioEventLoopGroup()) | |
.channel(NioSocketChannel.class) | |
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) | |
.handler(new ChannelInitializer<SocketChannel>() { | |
@Override | |
protected void initChannel(SocketChannel ch) { | |
ch.pipeline().addLast(new StringEncoder()) | |
.addLast(new QueueWritesBeforeConnect()); | |
} | |
}); | |
Channel ch = b.bind(0).sync().channel(); | |
ch.connect(new InetSocketAddress("127.0.0.1", 9000)); | |
ch.writeAndFlush("ABC"); | |
ch.writeAndFlush("DEF"); | |
ch.writeAndFlush("GHI"); | |
ch.closeFuture().sync(); | |
} | |
static class QueueWritesBeforeConnect extends ChannelDuplexHandler { | |
static final Logger logger = LoggerFactory.getLogger("queue"); | |
private int count; | |
private Queue<Write> queuedWrites = new LinkedList<Write>(); | |
static class Write { | |
private final Object msg; | |
private final ChannelPromise promise; | |
Write(Object msg, ChannelPromise promise) { | |
this.msg = msg; | |
this.promise = promise; | |
} | |
void run(ChannelHandlerContext ctx) { ctx.write(msg, promise); } | |
void fail() { promise.setFailure(new IOException()); } | |
} | |
@Override | |
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { | |
logger.debug("[{}] write: {}, queue it", count++, msg); | |
queuedWrites.add(new Write(msg, promise)); | |
} | |
@Override | |
public void flush(ChannelHandlerContext ctx) throws Exception { | |
logger.debug("[{}] flush, ignore", count++); | |
} | |
@Override | |
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { | |
logger.debug("[{}] channelRegistered", count++); | |
super.channelRegistered(ctx); | |
} | |
@Override | |
public void channelActive(ChannelHandlerContext ctx) throws Exception { | |
logger.debug("[{}] channel active, flush queue", count++); | |
Write op; | |
while ((op = queuedWrites.poll()) != null) op.run(ctx); | |
ctx.flush(); | |
ctx.pipeline().remove(this); | |
super.channelActive(ctx); | |
} | |
@Override | |
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { | |
logger.debug("[{}] handler removed", count++); | |
} | |
@Override | |
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { | |
logger.debug("channel unregistered, fail queue"); | |
Write op; | |
while ((op = queuedWrites.poll()) != null) op.fail(); | |
super.channelUnregistered(ctx); | |
} | |
} | |
} | |
// Normal operation... | |
// | |
// 22:04:09.515 [nioEventLoopGroup-2-1] DEBUG queue - [0] channelRegistered | |
// 22:04:09.521 [nioEventLoopGroup-2-1] DEBUG queue - [1] write: ABC, queue it | |
// 22:04:09.522 [nioEventLoopGroup-2-1] DEBUG queue - [2] write: DEF, queue it | |
// 22:04:09.522 [nioEventLoopGroup-2-1] DEBUG queue - [3] write: GHI, queue it | |
// 22:04:09.522 [nioEventLoopGroup-2-1] DEBUG queue - [4] flush, ignore | |
// 22:04:09.522 [nioEventLoopGroup-2-1] DEBUG queue - [5] channel active, flush queue | |
// 22:04:09.532 [nioEventLoopGroup-2-1] DEBUG queue - [6] handler removed | |
// But occasionally this happens... handler gets called after it was removed. | |
// | |
// 22:02:04.850 [nioEventLoopGroup-2-1] DEBUG queue - [0] channelRegistered | |
// 22:02:04.854 [nioEventLoopGroup-2-1] DEBUG queue - [1] channel active, flush queue | |
// 22:02:04.854 [nioEventLoopGroup-2-1] DEBUG queue - [2] handler removed | |
// 22:02:04.854 [nioEventLoopGroup-2-1] DEBUG queue - [3] write: ABC, queue it | |
// 22:02:04.854 [nioEventLoopGroup-2-1] DEBUG queue - [4] write: DEF, queue it | |
// 22:02:04.855 [nioEventLoopGroup-2-1] DEBUG queue - [5] write: GHI, queue it | |
// 22:02:04.855 [nioEventLoopGroup-2-1] DEBUG queue - [6] flush, ignore | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment