Skip to content

Instantly share code, notes, and snippets.

@xfrag
Last active August 29, 2015 13:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save xfrag/8859329 to your computer and use it in GitHub Desktop.
Save xfrag/8859329 to your computer and use it in GitHub Desktop.
Demonstrates a Netty write and then cancel issue. Applies to this revision: http://github.com/netty/netty/commit/e7b800e
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.CharsetUtil;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class NettyWriteCancelTest {
public static void main(String... args) throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
Server server = new Server();
Future<Void> serverFuture = executor.submit(server);
// quit on user input
System.out.println("Press 'Enter' to quit.");
//noinspection ResultOfMethodCallIgnored
System.in.read();
System.out.println("Shutting down..");
server.close();
serverFuture.get();
executor.shutdown();
System.out.println("Done.");
}
public static class Server implements Callable<Void> {
private Channel channel = null;
@Override
public Void call() throws Exception {
final EventLoopGroup bossGroup = new NioEventLoopGroup();
final EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ServerHandler());
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(8080).sync();
// Wait until the server socket is closed.
channel = f.channel();
channel.closeFuture().sync();
} finally {
workerGroup.shutdownGracefully().sync();
bossGroup.shutdownGracefully().sync();
}
return null;
}
public void close() {
if (channel != null) channel.close();
}
}
public static class ServerHandler extends ChannelHandlerAdapter {
private static final byte[] GHOST =
"---> you should not see me <---\n".getBytes(CharsetUtil.US_ASCII);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ChannelFuture future = ctx.write(Unpooled.wrappedBuffer(GHOST));
future.addListener(WRITE_LISTENER).cancel(false);
ctx.writeAndFlush(msg).addListener(WRITE_LISTENER);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
private static final ChannelFutureListener WRITE_LISTENER = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
assert future.isDone();
if (future.isSuccess()) {
System.out.println("Message written.");
} else if (future.isCancelled()) {
System.out.println("Message write cancelled.");
} else {
System.err.println("Message write failed.");
//noinspection ThrowableResultOfMethodCallIgnored
future.cause().printStackTrace(System.err);
}
}
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment