Skip to content

Instantly share code, notes, and snippets.

@sigbjod
Last active February 1, 2018 08:46
Show Gist options
  • Save sigbjod/4706349 to your computer and use it in GitHub Desktop.
Save sigbjod/4706349 to your computer and use it in GitHub Desktop.
My implementation of a Netty server handling HTTP requests
public class HttpPipelineFactory implements ChannelPipelineFactory {
private final ExecutionHandler executionHandler;
private final ProductDaoImpl productDao;
public HttpPipelineFactory(ProductDaoImpl productDao,
OrderedMemoryAwareThreadPoolExecutor eventExecutor) {
this.productDao = productDao;
this.executionHandler = new ExecutionHandler(eventExecutor);
}
@Override
public ChannelPipeline getPipeline() throws Exception {
// Creating a default pipeline implementation
ChannelPipeline pipeline = Channels.pipeline();
// Adding decoder and encoder handlers
pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("encoder", new HttpResponseEncoder());
// Addding OrderedMemoryAwareThreadPoolExecutor to handle our blocking handler
pipeline.addLast("executor", executionHandler);
// Adding blocking handler here
pipeline.addLast("handler", new HttpRequestHandler(productDao));
return pipeline;
}
}
public class HttpRequestHandler extends SimpleChannelUpstreamHandler {
private Logger logger = Logger.getLogger(HttpRequestHandler.class);
private ProductDaoImpl productDao;
public HttpRequestHandler(ProductDaoImpl productDao) {
this.productDao = productDao;
}
@Override
public void messageReceived(ChannelHandlerContext context, MessageEvent event)
throws Exception {
HttpRequest request = (HttpRequest) event.getMessage();
boolean keepAlive = false;
if (request != null) {
InetSocketAddress remoteAddress = (InetSocketAddress) context
.getChannel().getRemoteAddress();
logger.info(request.getProtocolVersion() + " " + request.getMethod()
+ " " + request.getUri() + " "
+ remoteAddress.getAddress().getHostAddress());
keepAlive = HttpHeaders.isKeepAlive(request);
}
HttpResponse response;
String content = null;
// Extract information from request and process it (omitted here)
response = startResponse(HttpResponseStatus.OK);
content = "Content provided by very intelligent business logic";
finish(event.getChannel(), response, content, keepAlive);
}
@Override
public void exceptionCaught(ChannelHandlerContext context,
ExceptionEvent event) {
HttpResponse response = startResponse(INTERNAL_SERVER_ERROR);
finish(event.getChannel(), response, null, false);
}
private HttpResponse startResponse(HttpResponseStatus status) {
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1,
status);
response.setHeader(CACHE_CONTROL, "no-store");
response.setHeader(PRAGMA, "no-cache");
return response;
}
private void finish(Channel channel, HttpResponse response,
String responseString, boolean keepAlive) {
if (responseString != null) {
if (keepAlive) {
response.setHeader(CONTENT_LENGTH, responseString.length());
response.setHeader(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
}
response.setContent(ChannelBuffers.copiedBuffer(responseString,
CharsetUtil.UTF_8));
}
ChannelFuture future = channel.write(response);
if (!keepAlive) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
}
public class Server {
private static final ChannelGroup allChannels = new DefaultChannelGroup("server");
public static void main(String[] args) {
// Setting Netty logger
InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory());
// Configuring business logic (omitted here)
// Bootstrapping server
Runtime runtime = Runtime.getRuntime();
int cores = runtime.availableProcessors();
int threads = 250;
// Thread pool for blocking calls
// Memory limit: 1MB per channel, 1GB global
// Timeout limit: 1s
// Starting OrderedMemoryAwareThreadPoolExecutor with 250 threads
OrderedMemoryAwareThreadPoolExecutor eventExecutor = new OrderedMemoryAwareThreadPoolExecutor(
threads, (long) 1048576, (long) 1073741824, 1, TimeUnit.SECONDS);
final ChannelFactory factory = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), cores);
ServerBootstrap bootstrap = new ServerBootstrap(factory);
// Setting backlog which increases the incoming connection queue
bootstrap.setOption("backlog", 8024);
// Setting socket channel properties as describe here
// http://static.netty.io/3.6/api/index.html?org/jboss/netty/channel/socket/SocketChannelConfig.html
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.keepAlive", false);
bootstrap.setPipelineFactory(new HttpPipelineFactory(productDao,
eventExecutor));
// Setting host configuration
Integer port = new Integer(6600);
Channel channel = bootstrap.bind(new InetSocketAddress(port));
allChannels.add(channel);
final Thread mainThread = Thread.currentThread();
runtime.addShutdownHook(new Thread() {
public void run() {
ChannelGroupFuture future = allChannels.close();
future.awaitUninterruptibly();
factory.releaseExternalResources();
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment