Skip to content

Instantly share code, notes, and snippets.

@jjongsma
Last active August 29, 2015 13:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jjongsma/8792858 to your computer and use it in GitHub Desktop.
Save jjongsma/8792858 to your computer and use it in GitHub Desktop.
1) Run WebSocketServer.java, and leave it running for the duration of this test.
2) Run WebSocketClient.java in a separate JVM. The first time pretty much always works as expected.
3) Run WebSocketClient.java again. 90% of the time the exception will appear during the second run.
4) Repeat #3 until you see the logged exception in STDOUT, which triggers a disconnect.
This appears to be a network queue/timing issue. When the request succeeds, the websocket upgrade
response and first frame arrive at the client as separate messages:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 54 54 50 2f 31 2e 31 20 31 30 31 20 53 77 69 |HTTP/1.1 101 Swi|
|00000010| 74 63 68 69 6e 67 20 50 72 6f 74 6f 63 6f 6c 73 |tching Protocols|
|00000020| 0d 0a 55 70 67 72 61 64 65 3a 20 77 65 62 73 6f |..Upgrade: webso|
|00000030| 63 6b 65 74 0d 0a 43 6f 6e 6e 65 63 74 69 6f 6e |cket..Connection|
|00000040| 3a 20 55 70 67 72 61 64 65 0d 0a 53 65 63 2d 57 |: Upgrade..Sec-W|
|00000050| 65 62 53 6f 63 6b 65 74 2d 41 63 63 65 70 74 3a |ebSocket-Accept:|
|00000060| 20 30 31 62 36 31 68 2f 74 43 45 73 35 44 6b 31 | 01b61h/tCEs5Dk1|
|00000070| 36 75 4d 69 4a 66 5a 79 38 66 62 41 3d 0d 0a 0d |6uMiJfZy8fbA=...|
|00000080| 0a |. |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 82 0f 49 6e 69 74 69 61 6c 20 6d 65 73 73 61 67 |..Initial messag|
|00000010| 65 |e |
+--------+-------------------------------------------------+----------------+
When the request fails and causes the client to disconnect, both messages arrive in the
same buffer:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 54 54 50 2f 31 2e 31 20 31 30 31 20 53 77 69 |HTTP/1.1 101 Swi|
|00000010| 74 63 68 69 6e 67 20 50 72 6f 74 6f 63 6f 6c 73 |tching Protocols|
|00000020| 0d 0a 55 70 67 72 61 64 65 3a 20 77 65 62 73 6f |..Upgrade: webso|
|00000030| 63 6b 65 74 0d 0a 43 6f 6e 6e 65 63 74 69 6f 6e |cket..Connection|
|00000040| 3a 20 55 70 67 72 61 64 65 0d 0a 53 65 63 2d 57 |: Upgrade..Sec-W|
|00000050| 65 62 53 6f 63 6b 65 74 2d 41 63 63 65 70 74 3a |ebSocket-Accept:|
|00000060| 20 74 75 67 72 52 4b 67 50 30 6e 61 79 76 59 4e | tugrRKgP0nayvYN|
|00000070| 6c 38 79 76 7a 6c 6d 42 6f 54 69 73 3d 0d 0a 0d |l8yvzlmBoTis=...|
|00000080| 0a 82 0f 49 6e 69 74 69 61 6c 20 6d 65 73 73 61 |...Initial messa|
|00000090| 67 65 |ge |
+--------+-------------------------------------------------+----------------+
At this point, ReplayingDecoder sees more data in the buffer after processing the upgrade
response, loops around for another pass, sees that "state" is already set to UPGRADED, and
leaves the buffer alone, presumably to allow the downstream WebSocket decoder to process
the frame. But what actually happens is that when the second decode() returns, the output
buffer size did not change so ReplayingDecoder throws an exception:
io.netty.handler.codec.DecoderException: HttpClientCodec$Decoder.decode() must consume the inbound data or change its state if it did not decode anything.
at io.netty.handler.codec.ReplayingDecoder.callDecode(ReplayingDecoder.java:374)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:139)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:148)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:338)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:324)
at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:297)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:338)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:324)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:785)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:126)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:485)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:452)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:346)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:101)
at java.lang.Thread.run(Thread.java:744)
package test;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.net.URI;
public class WebSocketClient {
public volatile boolean failed = false;
public static void main(final String... args) throws Exception {
final WebSocketClient client = new WebSocketClient();
client.connect(URI.create("ws://localhost:8888/test"));
}
public ChannelFuture connect(final URI uri) {
final NioEventLoopGroup elg = new NioEventLoopGroup();
final ChannelFuture future = new Bootstrap()
.channel(NioSocketChannel.class)
.group(elg)
.remoteAddress(new InetSocketAddress(uri.getHost(), uri.getPort()))
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(final Channel ch) throws Exception {
ch.pipeline().addLast(
new LoggingHandler("client"),
new HttpClientCodec(),
new ErrorReporter(),
new HttpObjectAggregator(65536),
new WebSocketClientProtocolHandler(
WebSocketClientHandshakerFactory.newHandshaker(uri,
WebSocketVersion.V13, null, false, null)),
new CapabilitiesReceiver()
);
}
}).connect();
future.channel().closeFuture().addListener(new GenericFutureListener<Future<Void>>() {
@Override
public void operationComplete(final Future<Void> future) throws Exception {
elg.shutdownGracefully();
}
});
return future;
}
private class ErrorReporter extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(final ChannelHandlerContext ctx,
final Throwable cause) throws Exception {
failed = true;
cause.printStackTrace();
ctx.fireExceptionCaught(cause);
}
}
private static class CapabilitiesReceiver extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
System.out.println("Received message: " + msg.toString());
ctx.fireChannelRead(msg);
ctx.close();
}
@Override
public void userEventTriggered(final ChannelHandlerContext ctx,
final Object evt) throws Exception {
if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {
System.out.println("Client handshake complete");
}
ctx.fireUserEventTriggered(evt);
}
}
}
package test;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
public class WebSocketServer {
public static void main(final String... args) throws Exception {
final WebSocketServer server = new WebSocketServer();
server.listen("localhost", 8888, "/test");
}
public ChannelFuture listen(final String host, final int port, final String path) {
return new ServerBootstrap()
.channel(NioServerSocketChannel.class)
.group(new NioEventLoopGroup())
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(final Channel ch) throws Exception {
ch.pipeline().addLast(
new HttpResponseEncoder(),
new HttpRequestDecoder(),
new HttpObjectAggregator(65536),
new WebSocketServerProtocolHandler(path),
new CapabilitiesBroadcaster());
}
}).bind(host, port);
}
private static class CapabilitiesBroadcaster extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(final ChannelHandlerContext ctx,
final Object evt) throws Exception {
if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
ctx.writeAndFlush(new BinaryWebSocketFrame(Unpooled.wrappedBuffer("Initial message".getBytes())));
}
ctx.fireUserEventTriggered(evt);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment