Skip to content

Instantly share code, notes, and snippets.

@danbev
Last active April 30, 2017 13:39
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save danbev/9800590 to your computer and use it in GitHub Desktop.
Save danbev/9800590 to your computer and use it in GitHub Desktop.
Netty SockJS Refactoring

Netty SockJS Refactoring

Pull request #1615 is intended to add SockJS support to Netty. It does so by adding an API on top of Netty that end users have to implement. The interface that users had to implement looks like this:

/**
 * Represents the server side business application server in SockJS.
 */
public interface SockJsService {

    /**
     * The {@link SockJsConfig} for this service
     *
     * @return {@link SockJsConfig} this services configuration.
     */
    SockJsConfig config();

    /**
     * Will be called when a new session is opened.
     *
     * @param session the {@link SockJsSessionContext} which can be stored and used for sending/closing.
     */
    void onOpen(SockJsSessionContext session);

    /**
     * Will be called when a message is sent to the service.
     *
     * @param message the message sent from a client.
     * @throws Exception
     */
    void onMessage(String message) throws Exception;

    /**
     * Will be called when the session is closed.
     */
    void onClose();

}

This document discusses an optional implementation where only existing Netty API are used.
The sockjs-refectoring branch is being used to experiment with this.

Refactoring

Instead of implementing the SockJsService interface a SockJS service will instead be a simple ChannelHandler, for example:

public class SockJsEchoHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
        ctx.writeAndFlush(msg);
    }

}

The following example will be used to explain some of the details and issue with implementing this.

onOpen

The onOpen method in the original SockJsService interface does not really have an equivalent in Netty due to that onOpen refers to a SockJs Session being opened. This may sometimes map to a channelActive event but not always depending on the transport being used.
At the moment this is handled via a user event, and looks like this:

@Override
public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) throws Exception {
    if (evt == Event.ON_SESSION_OPEN) {
        logger.info("Connected");
    } 
}

onClose

While it might be most logical to handle this using Netty's close method on a ChannelHandler this concerns closing the SockJS session. Netty close method would be called for some transport, like a polling transport, more often than the SockJS session is closed. Therefore this is also handled using a user event.

On close will also be handled using user events:

    @Override
    public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) throws Exception {
        if (evt == Event.CLOSE_SESSION) {
            logger.info("Session closing");
        }
    }

NIO example

Taken from NettySockJsServer.java

    public void run() throws Exception {
        final EventLoopGroup bossGroup = new NioEventLoopGroup();
        final EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            final ServerBootstrap sb = new ServerBootstrap();
            sb.channel(NioSockJsServerChannel.class);
            sb.group(bossGroup, workerGroup);
            

            final CorsConfig corsConfig = DefaultSockJsConfig.defaultCorsConfig("test", "*")
                    .allowedRequestHeaders("a", "b", "c")
                    .allowNullOrigin()
                    .allowedRequestMethods(POST, GET, OPTIONS)
                    .build();

            sb.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(final SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new SockJsEchoHandler());
                }
            });
            sb.option(PREFIX, "/echo");
            sb.childOption(MAX_STREAMING_BYTES_SIZE, 4096);
            sb.childOption(CORS_CONFIG, corsConfig);
            sb.register();

            sb.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(final SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new SockJsCloseHandler());
                }
            });
            sb.option(PREFIX, "/close");
            sb.childOption(CORS_CONFIG, corsConfig);
            sb.register();

            sb.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(final SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new SockJsEchoHandler());
                }
            });
            sb.option(PREFIX, "/cookie_needed_echo");
            sb.childOption(COOKIES_NEEDED, true);
            sb.childOption(CORS_CONFIG, corsConfig);
            sb.register();

            sb.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(final SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new SockJsWSDisabledHandler());
                }
            });
            sb.option(PREFIX, "/disabled_websocket_echo");
            sb.childOption(WEBSOCKET_ENABLED, false);
            sb.childOption(CORS_CONFIG, corsConfig);
            sb.register();

            final Channel ch = sb.bind(port).sync().channel();
            System.out.println("Web socket server started on port [" + port + "], ");
            ch.closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    

In the above example we are configuring multiple SockJS services/handlers, 4 in total named echo, close, cookie_needed_echo, and disabled_websocket_echo. The handlers in this case match those that are required by the sockjs-protocol testsuite.

Implementation details

Notice also that we are calling register after having configured the services/handlers. This is where SockJS differs from a standard Netty. When a request is processed it is the prefix in the request url that determines which SockJS service that should process the request.
For example, a request url might look like /echo/123/123/xhr, where echo is the prefix, or service name if you prefer. So we don't know the target service/handler of the request until we have processed the HTTP request.

When register is called NioSockJsServerChannel will not create a ServerSocketChannel which you might be used to when using the normal NioServerSocketChannel. Instead, register only creates a SockJsService for the current configuration and child handler:

@Override
protected void doRegister() throws Exception {
    final String prefix = config.getPrefix();
    services.putIfAbsent(prefix, new SockJsService(prefix, pipeline().removeFirst()));
}

We are removing the ChannelInitializer that was added by ServerBootstrap.init, and associating it with a SockJsService. A SockJsService is identified by a prefix (or service name) which maps to a HTTP request path segment. This allow us to configure multiple services/handlers. SockJsMultiplexer handles all HTTP request and inspected the path and retrieves the SockJsService for that request. It will then add the ChannelInitializer to the pipeline and call fireChannelRegistered to have the ChannelInitilizer run. Remember that this only adds a ServerBootstrapAcceptor to the pipeline and nothing more. We also need the ServerBootstrapAcceptor to be run so we also invoke fireChannelRead. This will add the childHandler to the pipeline, set the childOptions and childAttributes on the channel. It will also register the channel. This is a problem for the current SockJS implementation. The registering has always been there and I simply did not notice that it was getting called. But after rebasing the channel will be closed if registration does not succeed (there will be an exception saying that the channel is already registered). I've added a check for this to see if the channel is registered but it is more of a hack.

OIO example

public void run() throws Exception {
        final EventLoopGroup bossGroup = new OioEventLoopGroup();
        try {
            final ServerBootstrap sb = new ServerBootstrap();
            sb.channel(OioSockJsServerChannel.class);
            sb.group(bossGroup);
            sb.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
            
            final CorsConfig corsConfig = DefaultSockJsServiceConfig.defaultCorsConfig("test", "*", "localhost:8081")
                    .allowedRequestHeaders("a", "b", "c")
                    .allowNullOrigin()
                    .allowedRequestMethods(POST, GET, OPTIONS)
                    .build();

            sb.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(final SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new SockJsEchoHandler());
                }
            });
            sb.option(PREFIX, "/echo");
            sb.childOption(MAX_STREAMING_BYTES_SIZE, 4096);
            sb.childOption(CORS_CONFIG, corsConfig);
            sb.childOption(HEARTBEAT_INTERVAL, 60000L);
            sb.register();

            sb.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(final SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new SockJsCloseHandler());
                }
            });
            sb.option(PREFIX, "/close");
            sb.childOption(CORS_CONFIG, corsConfig);
            sb.childOption(HEARTBEAT_INTERVAL, 60000L);
            sb.register();

            sb.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(final SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new SockJsEchoHandler());
                }
            });
            sb.option(PREFIX, "/cookie_needed_echo");
            sb.childOption(COOKIES_NEEDED, true);
            sb.childOption(CORS_CONFIG, corsConfig);
            sb.childOption(HEARTBEAT_INTERVAL, 60000L);
            sb.register();

            sb.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(final SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new SockJsWSDisabledHandler());
                }
            });
            sb.option(PREFIX, "/disabled_websocket_echo");
            sb.childOption(WEBSOCKET_ENABLED, false);
            sb.childOption(CORS_CONFIG, corsConfig);
            sb.childOption(HEARTBEAT_INTERVAL, 60000L);
            sb.register();

            final Channel ch = sb.bind(port).sync().channel();
            System.out.println("Web socket server started on port [" + port + "], ");
            ch.closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

SockJS Channel initialization

The base ChannelHandlers in SockJS, the handlers that set up HTTP/HTTPS, can be configured using an option on the server bootstrap:

sb.option(CHANNEL_INITIALIZER, new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast("decoder", new HttpRequestDecoder());
        ch.pipeline().addLast("encoder", new HttpResponseEncoder());
        ch.pipeline().addLast("chucked", new HttpObjectAggregator(1048576));
        ch.pipeline().addLast("custom", new SimpleChannelInboundHandler<HttpRequest>() {
            @Override
            protected void messageReceived(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
                System.out.println("Custom handler to show customization of SockJS is possible.");
                ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
            }
        });
        ch.pipeline().addLast("mux", new SockJsMultiplexer());
    }
});

This allows custom configuration of the handlers incase the default, provided by SockJsChannelInitializer are not satisfactory.

Remaining tasks

Building the sockjs-refactoring branch

There have been two changes outside of the codec-sockjs module which means that these the following two modules must be build prior to building codec-sockjs:

Test run

  1. Update the echo service to have a longer session timeout to enable enough time to perform manual curl commands:
sb.option(PREFIX, "/echo");
sb.childOption(MAX_STREAMING_BYTES_SIZE, 4096);
sb.childOption(CORS_CONFIG, corsConfig);
sb.childOption(SESSION_TIMEOUT, 120000L);
sb.childOption(HEARTBEAT_INTERVAL, 60000L);
sb.register();
  1. Start the server by running the following command from the codec-sockjs directory:
mvn exec:java
  1. Use xhr_polling to open a session:
curl -v http://localhost:8081/echo/123/123/xhr
  1. Send a message:
curl -i --header "Content-Type: application/javascript" -X POST -d '["some data"]' http://localhost:8081/echo/123/123/xhr_send
  1. Retreive the message using another xhr_polling request:
curl -v http://localhost:8081/echo/123/123/xhr

Running the sockjs-protcol-0.3.3 testsuite

One of the changes when doing the refactoring was to use Netty's CorsHandler which was extracted and generalized. Doing this caused a few errors to be returned by sockjs-protocol 0.3.3 tests as a few of the tests CORS handling is not correct in my opinion. The following pull requests have been registered for this:

There was also an issue with parsing of http headers:

These two are included in this branch and can be used to run the sockjs-protocol testsuite.

  1. Start the Netty SockJS Server:
cd netty/codec-sockjs
mvn exec:java
  1. Run the sockjs-protocol testsuite:
cd sockjs-protocol
make test_deps (only required to be run once)
./venv/bin/python sockjs-protocol-0.3.3.py
@normanmaurer
Copy link

@danbev: didn't you say to explain why we need ' SockJsEventLoopGroup' ? I did not see this in the gist...

@danbev
Copy link
Author

danbev commented Mar 27, 2014

@normanmaurer: Sorry, you are correct I did say that :). Actually, SockJsEventLoopGroup it is not really required in the version above. This is something I'm looking into with regards to implementing the support of multiple SockJS services. It would be possible to simply use a NioEventLoopGroup instead, and just specify the SockJsServerChannel in the call to serverBootstrap.channel (with a few minor changes to SockJsServerChanne).

@normanmaurer
Copy link

@danbev if we could just use NioEventLoopGroup (or whatever EventLoopGroup that matches) this would be quite better. Also how would you be able to use this with any SocketChannel ? I think at the moment it only works with NioSocketChannel ? We should make it work also with any SocketChannel..

@danbev
Copy link
Author

danbev commented Mar 31, 2014

@normanmaurer: I've removed the SockJsEventLoop and SockJsEventLoopGroup classes now.
Yeah, you are right, this would only work with a NioSocketChannel at the moment. Let me try to address this. Thx!

@danbev
Copy link
Author

danbev commented Apr 1, 2014

@normanmaurer: I've updated the gist and pushed a suggestion to the branch. This is trying to address the issue with the solution only working for NIO and that it should work with any SocketChannel, please see oio-example for details. Is this what you meant that is should work with any SocketChannel?

@normanmaurer
Copy link

@danbev didn't you say you wanted to make it work like web sockets etc ? So Why is there a OioSockJsServerChannel and NioSockJsServerChannel ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment