Skip to content

Instantly share code, notes, and snippets.

@Crystark
Created December 16, 2013 16:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Crystark/216b1e92a1c0f4e8969e to your computer and use it in GitHub Desktop.
Save Crystark/216b1e92a1c0f4e8969e to your computer and use it in GitHub Desktop.
package reactortest;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Environment;
import reactor.function.Consumer;
import reactor.tcp.TcpConnection;
import reactor.tcp.TcpServer;
import reactor.tcp.encoding.StandardCodecs;
import reactor.tcp.netty.NettyServerSocketOptions;
import reactor.tcp.netty.NettyTcpServer;
import reactor.tcp.spec.TcpServerSpec;
public class Main {
private static final Logger logger = LoggerFactory.getLogger(Main.class);
public static void main(String[] args) {
tcpServer();
}
public static void tcpServer() {
Environment env = new Environment();
// create a spec using the Netty-based server
TcpServer<String, String> server = new TcpServerSpec<String, String>(NettyTcpServer.class)
.env(env)
.codec(StandardCodecs.LINE_FEED_CODEC)
.consume(new Consumer<TcpConnection<String, String>>() {
@Override
public void accept(final TcpConnection<String, String> conn) {
// for each connection, process incoming data
conn.in().consume(new Consumer<String>() {
@Override
public void accept(String line) {
// handle line feed data
// respond to client (newline will be added by codec)
conn.send("Hello World!");
}
});
}
})
.get();
logger.info("Start!");
server.start();
logger.info("Done!");
}
public static void httpServer() {
Environment env = new Environment();
TcpServer<HttpObject, HttpObject> server = new TcpServerSpec<HttpObject, HttpObject>(NettyTcpServer.class)
.env(env)
.dispatcher(env.getDefaultDispatcher())
.options(new NettyServerSocketOptions()
.pipelineConfigurer(new Consumer<ChannelPipeline>() {
@Override
public void accept(ChannelPipeline pipeline) {
pipeline.addLast(new HttpServerCodec());
}
})
)
.listen(8080)
.consume(new Consumer<TcpConnection<HttpObject, HttpObject>>() {
@Override
public void accept(final TcpConnection<HttpObject, HttpObject> conn) {
logger.info("accept!");
conn.in()
.consume(
new Consumer<HttpObject>() {
@Override
public void accept(HttpObject req) {
logger.info("Got: {}", req);
if (req instanceof HttpRequest) {
ByteBuf content = Unpooled.copiedBuffer("OK".getBytes());
DefaultFullHttpResponse resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK,
content);
resp.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
resp.headers().set(HttpHeaders.Names.CONTENT_LENGTH, content.readableBytes());
conn.send(resp);
}
}
}
);
}
})
.get();
logger.info("Start!");
server.start();
logger.info("Done!");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment