Created
March 15, 2017 11:11
-
-
Save louxiu/675027ae7563403dd8c71e4ae8ec095d to your computer and use it in GitHub Desktop.
Understanding netty channel buffers and watermarks
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.netty.bootstrap.Bootstrap; | |
import io.netty.buffer.ByteBuf; | |
import io.netty.buffer.Unpooled; | |
import io.netty.channel.ChannelFuture; | |
import io.netty.channel.ChannelHandlerContext; | |
import io.netty.channel.ChannelInitializer; | |
import io.netty.channel.ChannelOption; | |
import io.netty.channel.ChannelPipeline; | |
import io.netty.channel.EventLoopGroup; | |
import io.netty.channel.SimpleChannelInboundHandler; | |
import io.netty.channel.nio.NioEventLoopGroup; | |
import io.netty.channel.socket.SocketChannel; | |
import io.netty.channel.socket.nio.NioSocketChannel; | |
import java.util.concurrent.TimeUnit; | |
public final class Client { | |
public static void main(String[] args) throws Exception { | |
EventLoopGroup group = new NioEventLoopGroup(); | |
try { | |
Bootstrap b = new Bootstrap(); | |
b.group(group) | |
.channel(NioSocketChannel.class) | |
.option(ChannelOption.TCP_NODELAY, true) | |
.handler(new ChannelInitializer<SocketChannel>() { | |
@Override | |
public void initChannel(SocketChannel ch) throws Exception { | |
ChannelPipeline p = ch.pipeline(); | |
p.addLast(new SimpleChannelInboundHandler<Object>() { | |
@Override | |
public void channelActive(ChannelHandlerContext ctx) throws Exception { | |
super.channelActive(ctx); | |
// trigger server write | |
ByteBuf trigger = Unpooled.buffer(1); | |
trigger.writeByte(1); | |
ctx.writeAndFlush(trigger); | |
} | |
@Override | |
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { | |
System.out.println("hold on read"); | |
Thread.sleep(TimeUnit.DAYS.toMillis(1)); | |
} | |
}); | |
} | |
}); | |
ChannelFuture f = b.connect("127.0.0.1", 8007).sync(); | |
f.channel().closeFuture().sync(); | |
} finally { | |
// Shut down the event loop to terminate all threads. | |
group.shutdownGracefully(); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.netty.bootstrap.ServerBootstrap; | |
import io.netty.buffer.ByteBuf; | |
import io.netty.buffer.Unpooled; | |
import io.netty.channel.ChannelFuture; | |
import io.netty.channel.ChannelHandlerContext; | |
import io.netty.channel.ChannelInitializer; | |
import io.netty.channel.ChannelOption; | |
import io.netty.channel.ChannelPipeline; | |
import io.netty.channel.EventLoopGroup; | |
import io.netty.channel.SimpleChannelInboundHandler; | |
import io.netty.channel.nio.NioEventLoopGroup; | |
import io.netty.channel.socket.SocketChannel; | |
import io.netty.channel.socket.nio.NioServerSocketChannel; | |
import io.netty.handler.logging.LogLevel; | |
import io.netty.handler.logging.LoggingHandler; | |
public final class Server { | |
public static void main(String[] args) throws Exception { | |
EventLoopGroup bossGroup = new NioEventLoopGroup(1); | |
EventLoopGroup workerGroup = new NioEventLoopGroup(); | |
try { | |
ServerBootstrap b = new ServerBootstrap(); | |
b.group(bossGroup, workerGroup) | |
.channel(NioServerSocketChannel.class) | |
.option(ChannelOption.SO_BACKLOG, 100) | |
.handler(new LoggingHandler(LogLevel.INFO)) | |
.childHandler(new ChannelInitializer<SocketChannel>() { | |
@Override | |
public void initChannel(SocketChannel ch) throws Exception { | |
ChannelPipeline p = ch.pipeline(); | |
p.addLast(new SimpleChannelInboundHandler<Object>() { | |
@Override | |
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { | |
System.out.println("start to write"); | |
long bytesBeforeUnwritable = ctx.channel().bytesBeforeUnwritable(); | |
int count = 0; | |
while (true) { | |
ByteBuf oneByte = Unpooled.buffer(1); | |
oneByte.writeByte(1); | |
ctx.writeAndFlush(oneByte); | |
count ++; | |
if (ctx.channel().bytesBeforeUnwritable() != bytesBeforeUnwritable){ | |
bytesBeforeUnwritable = ctx.channel().bytesBeforeUnwritable(); | |
System.out.println(count + " : " + bytesBeforeUnwritable); | |
} | |
} | |
} | |
}); | |
} | |
}); | |
ChannelFuture f = b.bind(8007).sync(); | |
f.channel().closeFuture().sync(); | |
} finally { | |
// Shut down all event loops to terminate all threads. | |
bossGroup.shutdownGracefully(); | |
workerGroup.shutdownGracefully(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment