Created
October 14, 2019 18:33
-
-
Save ti2ger92/18c9c4439192d7e88996dbe928e3c300 to your computer and use it in GitHub Desktop.
Reproduce issue with Netty 4.1.42
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
plugins { | |
id 'java' | |
} | |
group 'nettyBugOct142019' | |
version '1.0-SNAPSHOT' | |
sourceCompatibility = 1.8 | |
repositories { | |
mavenCentral() | |
} | |
dependencies { | |
testCompile group: 'junit', name: 'junit', version: '4.12' | |
implementation group: 'io.netty', name: 'netty-all', version: '4.1.42.Final' | |
implementation group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.12.1' | |
implementation group: 'org.reactivestreams', name: 'reactive-streams', version: '1.0.3' | |
implementation 'io.reactivex.rxjava2:rxjava:2.2.0' | |
implementation group: 'org.apache.httpcomponents.client5', name: 'httpclient5', version: '5.0-beta5' | |
implementation group: 'org.apache.httpcomponents.core5', name: 'httpcore5-reactive', version: '5.0-beta9' | |
implementation group: 'org.junit.jupiter', name: 'junit-jupiter-api', version: '5.5.2' | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.netty.bootstrap.ServerBootstrap; | |
import io.netty.buffer.ByteBuf; | |
import io.netty.channel.*; | |
import io.netty.channel.nio.NioEventLoopGroup; | |
import io.netty.channel.socket.nio.NioServerSocketChannel; | |
import io.netty.handler.codec.ProtocolDetectionState; | |
import io.netty.handler.codec.http.*; | |
import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeCodecFactory; | |
import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeEvent; | |
import io.netty.handler.codec.http2.*; | |
import io.netty.handler.codec.http2.CleartextHttp2ServerUpgradeHandler.PriorKnowledgeUpgradeEvent; | |
import io.netty.handler.flow.FlowControlHandler; | |
import io.netty.handler.logging.LogLevel; | |
import io.netty.handler.ssl.SslContext; | |
import io.netty.util.AttributeKey; | |
import io.netty.util.concurrent.Future; | |
import io.netty.util.concurrent.GlobalEventExecutor; | |
import io.netty.util.concurrent.ScheduledFuture; | |
import io.netty.util.internal.logging.InternalLoggerFactory; | |
import io.netty.util.internal.logging.Log4J2LoggerFactory; | |
import org.apache.logging.log4j.LogManager; | |
import org.apache.logging.log4j.Logger; | |
import org.reactivestreams.Subscriber; | |
import org.reactivestreams.Subscription; | |
import java.net.InetSocketAddress; | |
import java.net.SocketAddress; | |
import java.net.URI; | |
import java.time.Duration; | |
import java.util.*; | |
import java.util.concurrent.ConcurrentSkipListSet; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.Semaphore; | |
import java.util.concurrent.ThreadFactory; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import static io.netty.channel.ChannelOption.SO_KEEPALIVE; | |
import static io.netty.channel.ChannelOption.TCP_NODELAY; | |
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME; | |
import static io.netty.util.AsciiString.contentEquals; | |
import static io.netty.util.AttributeKey.newInstance; | |
import static java.lang.String.format; | |
import static java.lang.management.ManagementFactory.getRuntimeMXBean; | |
import static java.util.concurrent.TimeUnit.SECONDS; | |
public final class NettyServerSimplified { | |
static final AttributeKey<String> CLIENT_IP = newInstance("clientIp"); | |
static final AttributeKey<ProtocolDetectionState> PROXY_PROTOCOL_STATE = newInstance("proxy-protocol-state"); | |
static final AttributeKey<String> VPC_ID = newInstance("vpc-id"); | |
static final AttributeKey<String> VPCE_ID = newInstance("vpce-id"); | |
static final AttributeKey<Integer> VPCE_CONFIG = newInstance("vpce-config"); | |
static final AttributeKey<String> VPCE_POLICY_URL = newInstance("vpce-policy-url"); | |
static final AttributeKey<HttpVersion> HTTP_VERSION = newInstance("http-version"); | |
static final AttributeKey<Long> TIME_OF_LAST_REQUEST_BYTE = newInstance("last-request-byte"); | |
static final AttributeKey<Long> TIME_OF_FIRST_RESPONSE_BYTE = newInstance("first-response-byte"); | |
private static final Logger LOG = LogManager.getLogger(NettyServerSimplified.class); | |
private static final int DEFAULT_MAX_HEADER_SIZE = 8192; | |
private static final int DEFAULT_MAX_KEEP_ALIVE_REQUESTS = 100; | |
private static final long DEFAULT_MAX_CONTENT_LENGTH = Long.MAX_VALUE; | |
private final List<URI> uris = Arrays.asList(URI.create("http://localhost:6001")); | |
private final int maxHeaderSize; | |
private final long maxContentLength; | |
private final OptionalInt maxKeepAliveRequests; | |
private final Optional<Duration> idleTimeout; | |
static { | |
InternalLoggerFactory.setDefaultFactory(Log4J2LoggerFactory.INSTANCE); | |
} | |
private final EventLoopGroup serverEventLoop; | |
private final EventLoopGroup workerEventLoop; | |
// This set tracks all open HTTP/1.1 and HTTP/2 connections, but not individual HTTP/2 streams. | |
private final ConcurrentSkipListSet<Channel> tcpChannels = new ConcurrentSkipListSet<>( | |
Comparator.comparingInt(System::identityHashCode)); | |
private final ScheduledFuture<?> tcpMonitor; | |
private final AtomicBoolean shuttingDown = new AtomicBoolean(false); | |
public NettyServerSimplified() throws Throwable { | |
LOG.info("Starting NettyServer method"); | |
// Enable only AwsCertificateManagerReporter in SwageServerCertificateMonitor | |
System.setProperty("com.amazon.swage.monitor.tls.CertificateMonitor.reporters", | |
"com.amazon.swage.monitor.tls.AwsCertificateManagerReporter"); | |
this.serverEventLoop = new NioEventLoopGroup(1, serverThreads()); | |
this.workerEventLoop = new NioEventLoopGroup(1, workerThreads()); | |
this.maxHeaderSize = DEFAULT_MAX_HEADER_SIZE; | |
this.maxContentLength = DEFAULT_MAX_CONTENT_LENGTH; | |
this.maxKeepAliveRequests = OptionalInt.of(DEFAULT_MAX_KEEP_ALIVE_REQUESTS); | |
idleTimeout = Optional.of(Duration.ofSeconds(60)); | |
SocketAddress localAddress; | |
LOG.info("starting to get bootstrap"); | |
for (URI u : uris) { | |
ServerBootstrap bootstrap; | |
bootstrap = getInsecureBootstrap(); | |
localAddress = new InetSocketAddress(u.getHost(), u.getPort()); | |
LOG.info("starting to bind bootstrap"); | |
bootstrap.group(serverEventLoop, workerEventLoop).localAddress(localAddress).bind().sync(); | |
String message = format("%s %s listening on %s", getClass().getSimpleName(), getRuntimeMXBean().getName(), | |
u); | |
LOG.info(message); | |
System.out.println("INFO: " + message); | |
} | |
LOG.info("starting to schedule tcpMonitor"); | |
tcpMonitor = GlobalEventExecutor.INSTANCE.scheduleAtFixedRate( | |
() -> LOG.info("{} open TCP connection(s)", tcpChannels.size()), 0, 15, SECONDS); | |
LOG.info("starting to check consistency"); | |
LOG.info("done with method"); | |
} | |
private static ThreadFactory serverThreads() { | |
ThreadFactory serverThreadFactory = new ThreadFactory() { | |
@Override | |
public Thread newThread(Runnable r) { | |
Thread serverThread = Executors.defaultThreadFactory().newThread(r); | |
serverThread.setName("netty-server-0"); | |
return serverThread; | |
} | |
}; | |
return serverThreadFactory; | |
} | |
private static ThreadFactory workerThreads() { | |
ThreadFactory workerThreadFactory = new ThreadFactory() { | |
@Override | |
public Thread newThread(Runnable r) { | |
Thread workerThread = Executors.defaultThreadFactory().newThread(r); | |
workerThread.setName("netty-worker-0"); | |
return workerThread; | |
} | |
}; | |
return workerThreadFactory; | |
} | |
public void dispose() { | |
try { | |
shuttingDown.set(true); | |
// First, we shut down the server's EventLoopGroup, which closes the server sockets and prevents any new TCP | |
// connections from being opened. | |
shutdownServer(); | |
// Finally, we shut down the worker EventLoopGroup. Note that this is a "graceful" shutdown of the event loops, | |
// *not* of any outstanding connections, which would be closed as soon as we call #shutdownGracefully. | |
workerEventLoop.shutdownGracefully().sync(); | |
LOG.info("Server shutdown complete"); | |
} catch (InterruptedException e) { | |
LOG.warn("Interrupted while disposing of " + this, e); | |
} | |
} | |
private void shutdownServer() throws InterruptedException { | |
LOG.info("Shutting down server event loop group"); | |
Future<?> serverShutdownFuture = serverEventLoop.shutdownGracefully(); | |
serverShutdownFuture.sync(); | |
for (URI u : uris) { | |
String message = getClass().getSimpleName() + " canceled " + u; | |
if (LOG.isInfoEnabled()) { | |
LOG.info(message); | |
} | |
System.out.println("INFO: " + message); | |
} | |
} | |
private ServerBootstrap getInsecureBootstrap() { | |
return newBootstrap(false, false); | |
} | |
private ServerBootstrap newBootstrap(boolean useUDS, boolean useSSL) { | |
ServerBootstrap bootstrap = new ServerBootstrap(); | |
SslContext sslContext = null; | |
bootstrap.channelFactory(NioServerSocketChannel::new); | |
// Create the Netty Handler factory that'll be used for each Channel | |
bootstrap.childHandler(new BugChannelInitializer(useSSL, sslContext)); | |
if (!useUDS) { | |
bootstrap.childOption(SO_KEEPALIVE, true) | |
.childOption(TCP_NODELAY, true); | |
} | |
return bootstrap; | |
} | |
private final class BugChannelInitializer extends ChannelInitializer<Channel> { | |
private final boolean useSSL; | |
private final SslContext sslContext; | |
private BugChannelInitializer(boolean useSSL, SslContext sslContext) { | |
this.useSSL = useSSL; | |
this.sslContext = sslContext; | |
} | |
@Override | |
protected void initChannel(Channel ch) { | |
ChannelPipeline pipeline = ch.pipeline(); | |
ch.config().setAutoRead(false); | |
ch.attr(AttributeKey.newInstance("open-streams")).set(new Semaphore(1)); | |
// We limit the maxInitialLineLength and maxHeaderSize so we can minimize resource usage for processing | |
// invalid requests. maxChunkSize just configures the buffer size; it does not limit requests. | |
int maxInitialLineLength = maxHeaderSize; | |
int maxChunkSize = 32_768; | |
HttpServerCodec httpServerCodec = new HttpServerCodec(maxInitialLineLength, maxHeaderSize, maxChunkSize); | |
ChannelHandler http2RequestHandler = new ChannelInitializer<Channel>() { | |
@Override | |
protected void initChannel(Channel channel) { | |
// HTTP/2, being a multiplexed protocol, allocates a new stream for every transaction, and Netty | |
// allocates a new channel for each new stream. This initializer sets up the per-stream request | |
// processing pipeline in the child channel. Http2StreamFrameToHttpObjectCodec is conceptually | |
// similar to HttpServerCodec, except that it also translates between HTTP/2 frames and HTTP/1.1 | |
// types such as HttpRequest, so that the same pipeline can support both protocols. | |
channel.config().setAutoRead(false); | |
channel.attr(CLIENT_IP).set(channel.parent().attr(CLIENT_IP).get()); | |
channel.attr(VPC_ID).set(channel.parent().attr(VPC_ID).get()); | |
channel.attr(VPCE_ID).set(channel.parent().attr(VPCE_ID).get()); | |
channel.attr(VPCE_CONFIG).set(channel.parent().attr(VPCE_CONFIG).get()); | |
channel.attr(VPCE_POLICY_URL).set(channel.parent().attr(VPCE_POLICY_URL).get()); | |
channel.attr(HTTP_VERSION).set(HttpVersion.valueOf("HTTP/2.0")); | |
channel.pipeline().addLast("http2Codec", new Http2StreamFrameToHttpObjectCodec(true)); | |
installHandlers(channel.pipeline()); | |
channel.read(); | |
} | |
}; | |
Http2FrameCodec http2Codec = Http2FrameCodecBuilder.forServer() | |
.initialSettings(Http2Settings.defaultSettings().maxConcurrentStreams(1l)) | |
.frameLogger(new Http2FrameLogger(LogLevel.TRACE, "WIRE")) | |
.build(); | |
Http2MultiplexHandler http2Handler = new Http2MultiplexHandler(http2RequestHandler); | |
pipeline.addLast("upgradeHandler", getUpgradeHandler(httpServerCodec, http2Codec)); | |
installUpgradeListener(pipeline, http2Handler); | |
installHandlers(pipeline); | |
pipeline.read(); | |
} | |
private ChannelHandler getUpgradeHandler( | |
HttpServerCodec httpServerCodec, | |
Http2FrameCodec http2Codec | |
) { | |
UpgradeCodecFactory upgradeCodecFactory = protocol -> contentEquals(HTTP_UPGRADE_PROTOCOL_NAME, protocol) ? | |
new Http2ServerUpgradeCodec("http2Codec", http2Codec) : null; | |
HttpServerUpgradeHandler upgradeHandler = new HttpServerUpgradeHandler(httpServerCodec, | |
upgradeCodecFactory) { | |
@Override | |
protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out) throws Exception { | |
// no-op, remove this if/when we want to support h2c http upgrades | |
} | |
}; | |
return new CleartextHttp2ServerUpgradeHandler(httpServerCodec, upgradeHandler, http2Codec); | |
} | |
private void installUpgradeListener(ChannelPipeline pipeline, Http2MultiplexHandler http2Handler) { | |
// This handler listens for HTTP/2 upgrade events, resulting from either HTTP Upgrade or Prior Knowledge. | |
// When an upgrade occurs, the original channel becomes multiplexed (meaning that requests get routed to | |
// child channels), so we remove the handlers from the channel. | |
pipeline.addLast("upgradeListener", new ChannelInboundHandlerAdapter() { | |
@Override | |
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { | |
if (evt instanceof UpgradeEvent || evt instanceof PriorKnowledgeUpgradeEvent) { | |
// Remove this handler and everything after it | |
while (pipeline.removeLast() != this) { | |
// do nothing | |
} | |
pipeline.addLast("http2Handler", http2Handler); | |
ctx.channel().read(); | |
} else { | |
super.userEventTriggered(ctx, evt); | |
} | |
} | |
}); | |
} | |
private void installHandlers(ChannelPipeline pipeline) { | |
pipeline.addLast("flowControl", new FlowControlHandler()); | |
pipeline.addLast("generic", new ChannelDuplexHandler() { | |
private Channel channel; | |
private ResponseBodySubscriber responseBodySubscriber; | |
private boolean readComplete = false; | |
private boolean writeComplete = false; | |
@Override | |
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { | |
this.channel = ctx.channel(); | |
super.handlerAdded(ctx); | |
} | |
@Override | |
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { | |
this.channel = null; | |
super.handlerRemoved(ctx); | |
} | |
@Override | |
public void channelRead(ChannelHandlerContext ctx, Object msg) { | |
if (msg instanceof HttpRequest) { | |
channel.attr(TIME_OF_LAST_REQUEST_BYTE).set(null); | |
channel.attr(TIME_OF_FIRST_RESPONSE_BYTE).set(null); | |
ctx.write(new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)); | |
channel.read(); | |
} else if (msg instanceof HttpContent) { | |
HttpContent content = (HttpContent) msg; | |
ByteBuf buf = content.content(); | |
if (content instanceof LastHttpContent) { | |
channel.attr(TIME_OF_LAST_REQUEST_BYTE).set(System.nanoTime()); | |
buf.release(); | |
readComplete(); | |
} else { | |
buf.release(); | |
channel.read(); | |
} | |
} | |
} | |
@Override | |
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { | |
writeComplete = true; | |
responseBodySubscriber = null; | |
readNextRequest(); | |
} | |
private void readComplete() { | |
readComplete = true; | |
readNextRequest(); | |
} | |
private void readNextRequest() { | |
if (readComplete && writeComplete) { | |
readComplete = false; | |
writeComplete = false; | |
} else if (channel != null && channel.isOpen() /*&& isHttp2(channel)*/) { | |
// Since an HTTP/2 stream is created to serve only a single request, we know that there are no more | |
// requests to read on this stream. However, the client may send WINDOW_UPDATE frames or other | |
// HTTP/2-specific frame types, which we will read (and discard) as they arrive. | |
channel.read(); | |
} | |
} | |
final class ResponseBodySubscriber implements Subscriber<ByteBuf> { | |
private final ChannelHandlerContext ctx; | |
private final ChannelPromise responsePromise; | |
private Subscription subscription; | |
private ResponseBodySubscriber(ChannelHandlerContext ctx, ChannelPromise responsePromise) { | |
this.ctx = ctx; | |
this.responsePromise = responsePromise; | |
} | |
public void onSubscribe(Subscription subscription) { | |
this.subscription = subscription; | |
subscription.request(10); | |
} | |
public void onNext(ByteBuf buf) { | |
ctx.channel().attr(TIME_OF_FIRST_RESPONSE_BYTE).setIfAbsent(System.nanoTime()); | |
ctx.writeAndFlush(new DefaultHttpContent(buf)); | |
} | |
public void onError(Throwable t) { | |
} | |
public void onComplete() { | |
ctx.channel().attr(TIME_OF_FIRST_RESPONSE_BYTE).setIfAbsent(System.nanoTime()); | |
ctx.writeAndFlush(new DefaultLastHttpContent(), responsePromise); | |
subscription = null; | |
} | |
} | |
}); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.reactivex.Flowable; | |
import org.apache.hc.client5.http.async.methods.AsyncRequestBuilder; | |
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; | |
import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer; | |
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; | |
import org.apache.hc.client5.http.impl.async.HttpAsyncClients; | |
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager; | |
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; | |
import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy; | |
import org.apache.hc.client5.http.ssl.TrustAllStrategy; | |
import org.apache.hc.core5.http.nio.AsyncRequestProducer; | |
import org.apache.hc.core5.http2.HttpVersionPolicy; | |
import org.apache.hc.core5.reactive.ReactiveEntityProducer; | |
import org.apache.hc.core5.ssl.SSLContexts; | |
import org.junit.Test; | |
import org.reactivestreams.Subscriber; | |
import org.reactivestreams.Subscription; | |
import javax.net.ssl.SSLContext; | |
import java.net.URI; | |
import java.nio.ByteBuffer; | |
import java.security.MessageDigest; | |
import java.time.Duration; | |
import java.util.List; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.function.Function; | |
import static org.apache.hc.core5.http.ContentType.APPLICATION_OCTET_STREAM; | |
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; | |
public class StreamingIntegrationTest { | |
private static final AtomicInteger port = new AtomicInteger(6001); | |
private NettyServerSimplified server; | |
private CloseableHttpAsyncClient client; | |
private URI endpoint; | |
@Test | |
public void toServer() throws Throwable { | |
HttpVersionPolicy versionPolicy = HttpVersionPolicy.FORCE_HTTP_2; | |
String protocol = "http"; | |
startClient(versionPolicy, protocol); | |
endpoint = URI.create(protocol + "://localhost:" + port.getAndIncrement()); | |
long shortLength = 1024 * 7; | |
Function <Long, SimpleHttpResponse> testClient = aLength -> { | |
try { | |
Flowable<ByteBuffer> input = Flowable.rangeLong(1, aLength) | |
.buffer(1024) | |
.doOnNext(list -> System.out.println("list ends: " + list.get(list.size() - 1))) | |
.map(StreamingIntegrationTest::toBuffer); | |
Sha256Subscriber clientDigest = new Sha256Subscriber(); | |
input.subscribe(clientDigest); | |
server = new NettyServerSimplified(); | |
ReactiveEntityProducer body = new ReactiveEntityProducer(input, -1, APPLICATION_OCTET_STREAM, null); | |
AsyncRequestProducer request = AsyncRequestBuilder.post(endpoint) | |
.setEntity(body) | |
.build(); | |
return assertTimeoutPreemptively(Duration.ofSeconds(15), | |
() -> client.execute(request, SimpleResponseConsumer.create(), null).get()); | |
} catch (Throwable e) { | |
throw new RuntimeException("threw error", e); | |
} | |
}; | |
SimpleHttpResponse shortTestResult = testClient.apply(shortLength); | |
server.dispose(); | |
} | |
private void startClient(HttpVersionPolicy versionPolicy, String protocol) throws Exception { | |
PoolingAsyncClientConnectionManager cm = null; | |
if ("https".equals(protocol)) { | |
SSLContext sslContext = SSLContexts.custom() | |
.loadTrustMaterial(new TrustAllStrategy()) | |
.build(); | |
cm = PoolingAsyncClientConnectionManagerBuilder.create() | |
.setTlsStrategy(new DefaultClientTlsStrategy(sslContext, null, null, null, (host, ctx) -> true)) | |
.build(); | |
} | |
client = HttpAsyncClients.custom() | |
.setVersionPolicy(versionPolicy) | |
.setConnectionManager(cm) | |
.build(); | |
client.start(); | |
} | |
private static ByteBuffer toBuffer(List<Long> values) { | |
ByteBuffer buf = ByteBuffer.allocate(values.size() * Long.BYTES); | |
values.forEach(buf::putLong); | |
return (ByteBuffer) buf.flip(); | |
} | |
private static class Sha256Subscriber implements Subscriber<ByteBuffer> { | |
private final MessageDigest md; | |
private byte[] digest; | |
private Sha256Subscriber() throws Exception { | |
md = MessageDigest.getInstance("SHA-256"); | |
} | |
@Override | |
public void onSubscribe(Subscription s) { | |
s.request(Long.MAX_VALUE); | |
} | |
@Override | |
public void onNext(ByteBuffer byteBuffer) { | |
md.update(byteBuffer.duplicate()); | |
} | |
@Override | |
public void onError(Throwable t) { | |
digest = md.digest(); | |
} | |
@Override | |
public void onComplete() { | |
digest = md.digest(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment