Skip to content

Instantly share code, notes, and snippets.

@ChinaXing
Created February 10, 2015 09:39
Show Gist options
  • Save ChinaXing/e5808fed8605789c42d6 to your computer and use it in GitHub Desktop.
Save ChinaXing/e5808fed8605789c42d6 to your computer and use it in GitHub Desktop.
验证socket不可写的时候,如果不检查channelWritable会导致内存溢出
package com.chinaxing.test.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import java.util.Arrays;
import java.util.List;
/**
* 验证socket不可写的时候,如果不检查channelWritable 或者监听channelWritableChanged 事件,会导致内存溢出
* <p/>
* JVM 参数 -DloopCount=1000000 -Xms128m -Xmx128m
* <p/>
* <p/>
* 运行过程中会打印出剩余内存数量,可以看到当内存为10几M的时候,跑出OOM异常
* <p/>
* Created by LambdaCat on 15/2/10.
*/
public class OOMWrite {
private static volatile boolean isAccept = true;
public static void main(String[] args) {
startServer();
Channel channel = startClient();
Runtime runtime = Runtime.getRuntime();
for (int i = 0; i < Integer.parseInt(System.getProperty("loopCount")); i++) {
channel.writeAndFlush(new TestMessage(
String.valueOf(System.currentTimeMillis()),
Arrays.asList(runtime.freeMemory(), runtime.freeMemory(), runtime.availableProcessors(), runtime.totalMemory()).toString(),
runtime.availableProcessors(),
runtime.totalMemory()
));
// channel.flush();
if (i == 5) {
isAccept = false;
}
if (0 == i % 10) {
System.out.print("channel writable : " + channel.isWritable());
System.out.println(", free memory MB : " + Runtime.getRuntime().freeMemory() / 1000000);
}
}
}
private static void startServer() {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(new NioEventLoopGroup(1));
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new ByteToMessageDecoder() {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
in.markReaderIndex();
int len = in.readableBytes();
int size = in.readUnsignedShort();
if (size > len) {
in.resetReaderIndex();
return;
}
TestMessage tm = new TestMessage();
int aL = in.readUnsignedShort();
byte[] aB = new byte[aL];
in.readBytes(aB);
tm.a = new String(aB);
int bL = in.readUnsignedShort();
byte[] bB = new byte[bL];
in.readBytes(bB);
tm.b = new String(bB);
tm.c = in.readUnsignedShort();
tm.d = in.readLong();
out.add(tm);
}
})
.addLast(new SimpleChannelInboundHandler<TestMessage>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TestMessage msg) throws Exception {
System.out.println(msg.a);
while (!isAccept) {
try {
Thread.sleep(10000);
} catch (Exception e) {
System.err.println(e.getMessage());
}
}
}
});
}
});
try {
bootstrap.bind("127.0.0.1", 9090).sync();
} catch (InterruptedException e) {
e.printStackTrace();
System.exit(-1);
}
}
private static Channel startClient() {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup(2));
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new MessageToByteEncoder<TestMessage>() {
@Override
protected void encode(ChannelHandlerContext ctx, TestMessage msg, ByteBuf out) throws Exception {
out.writeShort(msg.getSize());
out.writeShort(msg.a.getBytes().length);
out.writeBytes(msg.a.getBytes());
out.writeShort(msg.b.getBytes().length);
out.writeBytes(msg.b.getBytes());
out.writeShort(msg.c);
out.writeLong(msg.d);
}
});
}
});
bootstrap.remoteAddress("127.0.0.1", 9090);
bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 16 * 4096); // 64K
ChannelFuture future = bootstrap.connect();
try {
future.sync();
} catch (InterruptedException e) {
e.printStackTrace();
System.exit(-1);
}
return future.channel();
}
static class TestMessage {
String a;
String b;
int c;
long d;
int getSize() {
return a.getBytes().length + b.getBytes().length + 4 + 8;
}
public TestMessage() {
}
public TestMessage(String a, String b, int c, long d) {
this.a = a;
this.b = b;
this.c = c;
this.d = d;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment