Skip to content

Instantly share code, notes, and snippets.

@ti2ger92
Created October 14, 2019 18:33
Show Gist options
  • Save ti2ger92/18c9c4439192d7e88996dbe928e3c300 to your computer and use it in GitHub Desktop.
Save ti2ger92/18c9c4439192d7e88996dbe928e3c300 to your computer and use it in GitHub Desktop.
Reproduce issue with Netty 4.1.42
plugins {
id 'java'
}
group 'nettyBugOct142019'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.12'
implementation group: 'io.netty', name: 'netty-all', version: '4.1.42.Final'
implementation group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.12.1'
implementation group: 'org.reactivestreams', name: 'reactive-streams', version: '1.0.3'
implementation 'io.reactivex.rxjava2:rxjava:2.2.0'
implementation group: 'org.apache.httpcomponents.client5', name: 'httpclient5', version: '5.0-beta5'
implementation group: 'org.apache.httpcomponents.core5', name: 'httpcore5-reactive', version: '5.0-beta9'
implementation group: 'org.junit.jupiter', name: 'junit-jupiter-api', version: '5.5.2'
}
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.ProtocolDetectionState;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeCodecFactory;
import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeEvent;
import io.netty.handler.codec.http2.*;
import io.netty.handler.codec.http2.CleartextHttp2ServerUpgradeHandler.PriorKnowledgeUpgradeEvent;
import io.netty.handler.flow.FlowControlHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.ssl.SslContext;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.netty.util.internal.logging.Log4J2LoggerFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
import static io.netty.channel.ChannelOption.TCP_NODELAY;
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME;
import static io.netty.util.AsciiString.contentEquals;
import static io.netty.util.AttributeKey.newInstance;
import static java.lang.String.format;
import static java.lang.management.ManagementFactory.getRuntimeMXBean;
import static java.util.concurrent.TimeUnit.SECONDS;
public final class NettyServerSimplified {
static final AttributeKey<String> CLIENT_IP = newInstance("clientIp");
static final AttributeKey<ProtocolDetectionState> PROXY_PROTOCOL_STATE = newInstance("proxy-protocol-state");
static final AttributeKey<String> VPC_ID = newInstance("vpc-id");
static final AttributeKey<String> VPCE_ID = newInstance("vpce-id");
static final AttributeKey<Integer> VPCE_CONFIG = newInstance("vpce-config");
static final AttributeKey<String> VPCE_POLICY_URL = newInstance("vpce-policy-url");
static final AttributeKey<HttpVersion> HTTP_VERSION = newInstance("http-version");
static final AttributeKey<Long> TIME_OF_LAST_REQUEST_BYTE = newInstance("last-request-byte");
static final AttributeKey<Long> TIME_OF_FIRST_RESPONSE_BYTE = newInstance("first-response-byte");
private static final Logger LOG = LogManager.getLogger(NettyServerSimplified.class);
private static final int DEFAULT_MAX_HEADER_SIZE = 8192;
private static final int DEFAULT_MAX_KEEP_ALIVE_REQUESTS = 100;
private static final long DEFAULT_MAX_CONTENT_LENGTH = Long.MAX_VALUE;
private final List<URI> uris = Arrays.asList(URI.create("http://localhost:6001"));
private final int maxHeaderSize;
private final long maxContentLength;
private final OptionalInt maxKeepAliveRequests;
private final Optional<Duration> idleTimeout;
static {
InternalLoggerFactory.setDefaultFactory(Log4J2LoggerFactory.INSTANCE);
}
private final EventLoopGroup serverEventLoop;
private final EventLoopGroup workerEventLoop;
// This set tracks all open HTTP/1.1 and HTTP/2 connections, but not individual HTTP/2 streams.
private final ConcurrentSkipListSet<Channel> tcpChannels = new ConcurrentSkipListSet<>(
Comparator.comparingInt(System::identityHashCode));
private final ScheduledFuture<?> tcpMonitor;
private final AtomicBoolean shuttingDown = new AtomicBoolean(false);
public NettyServerSimplified() throws Throwable {
LOG.info("Starting NettyServer method");
// Enable only AwsCertificateManagerReporter in SwageServerCertificateMonitor
System.setProperty("com.amazon.swage.monitor.tls.CertificateMonitor.reporters",
"com.amazon.swage.monitor.tls.AwsCertificateManagerReporter");
this.serverEventLoop = new NioEventLoopGroup(1, serverThreads());
this.workerEventLoop = new NioEventLoopGroup(1, workerThreads());
this.maxHeaderSize = DEFAULT_MAX_HEADER_SIZE;
this.maxContentLength = DEFAULT_MAX_CONTENT_LENGTH;
this.maxKeepAliveRequests = OptionalInt.of(DEFAULT_MAX_KEEP_ALIVE_REQUESTS);
idleTimeout = Optional.of(Duration.ofSeconds(60));
SocketAddress localAddress;
LOG.info("starting to get bootstrap");
for (URI u : uris) {
ServerBootstrap bootstrap;
bootstrap = getInsecureBootstrap();
localAddress = new InetSocketAddress(u.getHost(), u.getPort());
LOG.info("starting to bind bootstrap");
bootstrap.group(serverEventLoop, workerEventLoop).localAddress(localAddress).bind().sync();
String message = format("%s %s listening on %s", getClass().getSimpleName(), getRuntimeMXBean().getName(),
u);
LOG.info(message);
System.out.println("INFO: " + message);
}
LOG.info("starting to schedule tcpMonitor");
tcpMonitor = GlobalEventExecutor.INSTANCE.scheduleAtFixedRate(
() -> LOG.info("{} open TCP connection(s)", tcpChannels.size()), 0, 15, SECONDS);
LOG.info("starting to check consistency");
LOG.info("done with method");
}
private static ThreadFactory serverThreads() {
ThreadFactory serverThreadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread serverThread = Executors.defaultThreadFactory().newThread(r);
serverThread.setName("netty-server-0");
return serverThread;
}
};
return serverThreadFactory;
}
private static ThreadFactory workerThreads() {
ThreadFactory workerThreadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread workerThread = Executors.defaultThreadFactory().newThread(r);
workerThread.setName("netty-worker-0");
return workerThread;
}
};
return workerThreadFactory;
}
public void dispose() {
try {
shuttingDown.set(true);
// First, we shut down the server's EventLoopGroup, which closes the server sockets and prevents any new TCP
// connections from being opened.
shutdownServer();
// Finally, we shut down the worker EventLoopGroup. Note that this is a "graceful" shutdown of the event loops,
// *not* of any outstanding connections, which would be closed as soon as we call #shutdownGracefully.
workerEventLoop.shutdownGracefully().sync();
LOG.info("Server shutdown complete");
} catch (InterruptedException e) {
LOG.warn("Interrupted while disposing of " + this, e);
}
}
private void shutdownServer() throws InterruptedException {
LOG.info("Shutting down server event loop group");
Future<?> serverShutdownFuture = serverEventLoop.shutdownGracefully();
serverShutdownFuture.sync();
for (URI u : uris) {
String message = getClass().getSimpleName() + " canceled " + u;
if (LOG.isInfoEnabled()) {
LOG.info(message);
}
System.out.println("INFO: " + message);
}
}
private ServerBootstrap getInsecureBootstrap() {
return newBootstrap(false, false);
}
private ServerBootstrap newBootstrap(boolean useUDS, boolean useSSL) {
ServerBootstrap bootstrap = new ServerBootstrap();
SslContext sslContext = null;
bootstrap.channelFactory(NioServerSocketChannel::new);
// Create the Netty Handler factory that'll be used for each Channel
bootstrap.childHandler(new BugChannelInitializer(useSSL, sslContext));
if (!useUDS) {
bootstrap.childOption(SO_KEEPALIVE, true)
.childOption(TCP_NODELAY, true);
}
return bootstrap;
}
private final class BugChannelInitializer extends ChannelInitializer<Channel> {
private final boolean useSSL;
private final SslContext sslContext;
private BugChannelInitializer(boolean useSSL, SslContext sslContext) {
this.useSSL = useSSL;
this.sslContext = sslContext;
}
@Override
protected void initChannel(Channel ch) {
ChannelPipeline pipeline = ch.pipeline();
ch.config().setAutoRead(false);
ch.attr(AttributeKey.newInstance("open-streams")).set(new Semaphore(1));
// We limit the maxInitialLineLength and maxHeaderSize so we can minimize resource usage for processing
// invalid requests. maxChunkSize just configures the buffer size; it does not limit requests.
int maxInitialLineLength = maxHeaderSize;
int maxChunkSize = 32_768;
HttpServerCodec httpServerCodec = new HttpServerCodec(maxInitialLineLength, maxHeaderSize, maxChunkSize);
ChannelHandler http2RequestHandler = new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) {
// HTTP/2, being a multiplexed protocol, allocates a new stream for every transaction, and Netty
// allocates a new channel for each new stream. This initializer sets up the per-stream request
// processing pipeline in the child channel. Http2StreamFrameToHttpObjectCodec is conceptually
// similar to HttpServerCodec, except that it also translates between HTTP/2 frames and HTTP/1.1
// types such as HttpRequest, so that the same pipeline can support both protocols.
channel.config().setAutoRead(false);
channel.attr(CLIENT_IP).set(channel.parent().attr(CLIENT_IP).get());
channel.attr(VPC_ID).set(channel.parent().attr(VPC_ID).get());
channel.attr(VPCE_ID).set(channel.parent().attr(VPCE_ID).get());
channel.attr(VPCE_CONFIG).set(channel.parent().attr(VPCE_CONFIG).get());
channel.attr(VPCE_POLICY_URL).set(channel.parent().attr(VPCE_POLICY_URL).get());
channel.attr(HTTP_VERSION).set(HttpVersion.valueOf("HTTP/2.0"));
channel.pipeline().addLast("http2Codec", new Http2StreamFrameToHttpObjectCodec(true));
installHandlers(channel.pipeline());
channel.read();
}
};
Http2FrameCodec http2Codec = Http2FrameCodecBuilder.forServer()
.initialSettings(Http2Settings.defaultSettings().maxConcurrentStreams(1l))
.frameLogger(new Http2FrameLogger(LogLevel.TRACE, "WIRE"))
.build();
Http2MultiplexHandler http2Handler = new Http2MultiplexHandler(http2RequestHandler);
pipeline.addLast("upgradeHandler", getUpgradeHandler(httpServerCodec, http2Codec));
installUpgradeListener(pipeline, http2Handler);
installHandlers(pipeline);
pipeline.read();
}
private ChannelHandler getUpgradeHandler(
HttpServerCodec httpServerCodec,
Http2FrameCodec http2Codec
) {
UpgradeCodecFactory upgradeCodecFactory = protocol -> contentEquals(HTTP_UPGRADE_PROTOCOL_NAME, protocol) ?
new Http2ServerUpgradeCodec("http2Codec", http2Codec) : null;
HttpServerUpgradeHandler upgradeHandler = new HttpServerUpgradeHandler(httpServerCodec,
upgradeCodecFactory) {
@Override
protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out) throws Exception {
// no-op, remove this if/when we want to support h2c http upgrades
}
};
return new CleartextHttp2ServerUpgradeHandler(httpServerCodec, upgradeHandler, http2Codec);
}
private void installUpgradeListener(ChannelPipeline pipeline, Http2MultiplexHandler http2Handler) {
// This handler listens for HTTP/2 upgrade events, resulting from either HTTP Upgrade or Prior Knowledge.
// When an upgrade occurs, the original channel becomes multiplexed (meaning that requests get routed to
// child channels), so we remove the handlers from the channel.
pipeline.addLast("upgradeListener", new ChannelInboundHandlerAdapter() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof UpgradeEvent || evt instanceof PriorKnowledgeUpgradeEvent) {
// Remove this handler and everything after it
while (pipeline.removeLast() != this) {
// do nothing
}
pipeline.addLast("http2Handler", http2Handler);
ctx.channel().read();
} else {
super.userEventTriggered(ctx, evt);
}
}
});
}
private void installHandlers(ChannelPipeline pipeline) {
pipeline.addLast("flowControl", new FlowControlHandler());
pipeline.addLast("generic", new ChannelDuplexHandler() {
private Channel channel;
private ResponseBodySubscriber responseBodySubscriber;
private boolean readComplete = false;
private boolean writeComplete = false;
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.channel = ctx.channel();
super.handlerAdded(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
this.channel = null;
super.handlerRemoved(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
channel.attr(TIME_OF_LAST_REQUEST_BYTE).set(null);
channel.attr(TIME_OF_FIRST_RESPONSE_BYTE).set(null);
ctx.write(new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK));
channel.read();
} else if (msg instanceof HttpContent) {
HttpContent content = (HttpContent) msg;
ByteBuf buf = content.content();
if (content instanceof LastHttpContent) {
channel.attr(TIME_OF_LAST_REQUEST_BYTE).set(System.nanoTime());
buf.release();
readComplete();
} else {
buf.release();
channel.read();
}
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
writeComplete = true;
responseBodySubscriber = null;
readNextRequest();
}
private void readComplete() {
readComplete = true;
readNextRequest();
}
private void readNextRequest() {
if (readComplete && writeComplete) {
readComplete = false;
writeComplete = false;
} else if (channel != null && channel.isOpen() /*&& isHttp2(channel)*/) {
// Since an HTTP/2 stream is created to serve only a single request, we know that there are no more
// requests to read on this stream. However, the client may send WINDOW_UPDATE frames or other
// HTTP/2-specific frame types, which we will read (and discard) as they arrive.
channel.read();
}
}
final class ResponseBodySubscriber implements Subscriber<ByteBuf> {
private final ChannelHandlerContext ctx;
private final ChannelPromise responsePromise;
private Subscription subscription;
private ResponseBodySubscriber(ChannelHandlerContext ctx, ChannelPromise responsePromise) {
this.ctx = ctx;
this.responsePromise = responsePromise;
}
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(10);
}
public void onNext(ByteBuf buf) {
ctx.channel().attr(TIME_OF_FIRST_RESPONSE_BYTE).setIfAbsent(System.nanoTime());
ctx.writeAndFlush(new DefaultHttpContent(buf));
}
public void onError(Throwable t) {
}
public void onComplete() {
ctx.channel().attr(TIME_OF_FIRST_RESPONSE_BYTE).setIfAbsent(System.nanoTime());
ctx.writeAndFlush(new DefaultLastHttpContent(), responsePromise);
subscription = null;
}
}
});
}
}
}
import io.reactivex.Flowable;
import org.apache.hc.client5.http.async.methods.AsyncRequestBuilder;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
import org.apache.hc.client5.http.ssl.TrustAllStrategy;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http2.HttpVersionPolicy;
import org.apache.hc.core5.reactive.ReactiveEntityProducer;
import org.apache.hc.core5.ssl.SSLContexts;
import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import javax.net.ssl.SSLContext;
import java.net.URI;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import static org.apache.hc.core5.http.ContentType.APPLICATION_OCTET_STREAM;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
public class StreamingIntegrationTest {
private static final AtomicInteger port = new AtomicInteger(6001);
private NettyServerSimplified server;
private CloseableHttpAsyncClient client;
private URI endpoint;
@Test
public void toServer() throws Throwable {
HttpVersionPolicy versionPolicy = HttpVersionPolicy.FORCE_HTTP_2;
String protocol = "http";
startClient(versionPolicy, protocol);
endpoint = URI.create(protocol + "://localhost:" + port.getAndIncrement());
long shortLength = 1024 * 7;
Function <Long, SimpleHttpResponse> testClient = aLength -> {
try {
Flowable<ByteBuffer> input = Flowable.rangeLong(1, aLength)
.buffer(1024)
.doOnNext(list -> System.out.println("list ends: " + list.get(list.size() - 1)))
.map(StreamingIntegrationTest::toBuffer);
Sha256Subscriber clientDigest = new Sha256Subscriber();
input.subscribe(clientDigest);
server = new NettyServerSimplified();
ReactiveEntityProducer body = new ReactiveEntityProducer(input, -1, APPLICATION_OCTET_STREAM, null);
AsyncRequestProducer request = AsyncRequestBuilder.post(endpoint)
.setEntity(body)
.build();
return assertTimeoutPreemptively(Duration.ofSeconds(15),
() -> client.execute(request, SimpleResponseConsumer.create(), null).get());
} catch (Throwable e) {
throw new RuntimeException("threw error", e);
}
};
SimpleHttpResponse shortTestResult = testClient.apply(shortLength);
server.dispose();
}
private void startClient(HttpVersionPolicy versionPolicy, String protocol) throws Exception {
PoolingAsyncClientConnectionManager cm = null;
if ("https".equals(protocol)) {
SSLContext sslContext = SSLContexts.custom()
.loadTrustMaterial(new TrustAllStrategy())
.build();
cm = PoolingAsyncClientConnectionManagerBuilder.create()
.setTlsStrategy(new DefaultClientTlsStrategy(sslContext, null, null, null, (host, ctx) -> true))
.build();
}
client = HttpAsyncClients.custom()
.setVersionPolicy(versionPolicy)
.setConnectionManager(cm)
.build();
client.start();
}
private static ByteBuffer toBuffer(List<Long> values) {
ByteBuffer buf = ByteBuffer.allocate(values.size() * Long.BYTES);
values.forEach(buf::putLong);
return (ByteBuffer) buf.flip();
}
private static class Sha256Subscriber implements Subscriber<ByteBuffer> {
private final MessageDigest md;
private byte[] digest;
private Sha256Subscriber() throws Exception {
md = MessageDigest.getInstance("SHA-256");
}
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(ByteBuffer byteBuffer) {
md.update(byteBuffer.duplicate());
}
@Override
public void onError(Throwable t) {
digest = md.digest();
}
@Override
public void onComplete() {
digest = md.digest();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment