-
-
Save Crystark/216b1e92a1c0f4e8969e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package reactortest; | |
import io.netty.buffer.ByteBuf; | |
import io.netty.buffer.Unpooled; | |
import io.netty.channel.ChannelPipeline; | |
import io.netty.handler.codec.http.DefaultFullHttpResponse; | |
import io.netty.handler.codec.http.HttpHeaders; | |
import io.netty.handler.codec.http.HttpObject; | |
import io.netty.handler.codec.http.HttpRequest; | |
import io.netty.handler.codec.http.HttpResponseStatus; | |
import io.netty.handler.codec.http.HttpServerCodec; | |
import io.netty.handler.codec.http.HttpVersion; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import reactor.core.Environment; | |
import reactor.function.Consumer; | |
import reactor.tcp.TcpConnection; | |
import reactor.tcp.TcpServer; | |
import reactor.tcp.encoding.StandardCodecs; | |
import reactor.tcp.netty.NettyServerSocketOptions; | |
import reactor.tcp.netty.NettyTcpServer; | |
import reactor.tcp.spec.TcpServerSpec; | |
public class Main { | |
private static final Logger logger = LoggerFactory.getLogger(Main.class); | |
public static void main(String[] args) { | |
tcpServer(); | |
} | |
public static void tcpServer() { | |
Environment env = new Environment(); | |
// create a spec using the Netty-based server | |
TcpServer<String, String> server = new TcpServerSpec<String, String>(NettyTcpServer.class) | |
.env(env) | |
.codec(StandardCodecs.LINE_FEED_CODEC) | |
.consume(new Consumer<TcpConnection<String, String>>() { | |
@Override | |
public void accept(final TcpConnection<String, String> conn) { | |
// for each connection, process incoming data | |
conn.in().consume(new Consumer<String>() { | |
@Override | |
public void accept(String line) { | |
// handle line feed data | |
// respond to client (newline will be added by codec) | |
conn.send("Hello World!"); | |
} | |
}); | |
} | |
}) | |
.get(); | |
logger.info("Start!"); | |
server.start(); | |
logger.info("Done!"); | |
} | |
public static void httpServer() { | |
Environment env = new Environment(); | |
TcpServer<HttpObject, HttpObject> server = new TcpServerSpec<HttpObject, HttpObject>(NettyTcpServer.class) | |
.env(env) | |
.dispatcher(env.getDefaultDispatcher()) | |
.options(new NettyServerSocketOptions() | |
.pipelineConfigurer(new Consumer<ChannelPipeline>() { | |
@Override | |
public void accept(ChannelPipeline pipeline) { | |
pipeline.addLast(new HttpServerCodec()); | |
} | |
}) | |
) | |
.listen(8080) | |
.consume(new Consumer<TcpConnection<HttpObject, HttpObject>>() { | |
@Override | |
public void accept(final TcpConnection<HttpObject, HttpObject> conn) { | |
logger.info("accept!"); | |
conn.in() | |
.consume( | |
new Consumer<HttpObject>() { | |
@Override | |
public void accept(HttpObject req) { | |
logger.info("Got: {}", req); | |
if (req instanceof HttpRequest) { | |
ByteBuf content = Unpooled.copiedBuffer("OK".getBytes()); | |
DefaultFullHttpResponse resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, | |
HttpResponseStatus.OK, | |
content); | |
resp.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain"); | |
resp.headers().set(HttpHeaders.Names.CONTENT_LENGTH, content.readableBytes()); | |
conn.send(resp); | |
} | |
} | |
} | |
); | |
} | |
}) | |
.get(); | |
logger.info("Start!"); | |
server.start(); | |
logger.info("Done!"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment