Skip to content

Instantly share code, notes, and snippets.

@thomsonimjd
Last active March 29, 2018 05:27
Show Gist options
  • Save thomsonimjd/13f29ccf04ea02224e476a876a199185 to your computer and use it in GitHub Desktop.
Save thomsonimjd/13f29ccf04ea02224e476a876a199185 to your computer and use it in GitHub Desktop.
To reproduce vert.x HttpClient connection pool issue.
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import org.junit.Rule;
import org.junit.Test;
import org.mockserver.client.server.MockServerClient;
import org.mockserver.junit.MockServerRule;
import org.mockserver.model.Delay;
import org.mockserver.model.HttpRequest;
import org.mockserver.model.HttpResponse;
import org.mockserver.verify.VerificationTimes;
import java.util.Date;
import java.util.concurrent.TimeUnit;
public class HttpClientPoolTest extends VertxNubesTestBase {
protected final static String HC_RESPONCSE_STRING = "Server is running";
private static final Logger logger = LoggerFactory.getLogger(HttpClientPoolTest.class);
@Rule
public MockServerRule mockServerRule = new MockServerRule(this);
private MockServerClient mockServerClient;
// test with delay of 2s, 10s or any sleep will get error
// test with no delay will get successful response
@Test
public void testWithDelay(TestContext context) throws Exception {
int idleTimeoutSecs = 2; // TimeUnit.SECONDS
int connectTimeoutMillis = 2000; // TimeUnit.MILLISECONDS
// expected Request
HttpRequest hcRequest = HttpRequest.request()
.withPath("/hc")
.withMethod("GET");
// mock response
HttpResponse hcResponse = HttpResponse.response()
.withBody(HC_RESPONCSE_STRING)
.withStatusCode(200)
.withDelay(new Delay(TimeUnit.SECONDS, 3));
// setting behaviour for test case
mockServerClient.when(hcRequest).respond(hcResponse);
HttpClient client = vertx.createHttpClient(new HttpClientOptions().setDefaultHost("localhost")
.setDefaultPort(mockServerRule.getPort())
.setIdleTimeout(idleTimeoutSecs)
.setMaxPoolSize(2)
.setMaxWaitQueueSize(0)
.setConnectTimeout(connectTimeoutMillis)
);
// establish the connection 4 times.
for (int i = 0; i < 4; i++) {
final int t = i;
try {
Async async = context.async();
client.get("/hc", response -> {
System.out.println("RESPONSE : " + response);
async.complete();
mockServerClient.verify(hcRequest, VerificationTimes.once());
}).exceptionHandler(ex -> {
System.out.println("GOT_EXCEPTION_" + t + " => " + ex.getMessage());
ex.printStackTrace();
}).end();
if (i == 1) {
Thread.sleep(2000);
}
} catch (Throwable ex) {
System.out.println("GOT_AN_EXCEPTION");
ex.printStackTrace();
}
}
}
}
import com.github.aesteve.vertx.nubes.VertxNubes;
import io.mobitech.search.services.LoggingService;
import io.mobitech.search.services.PublishersService;
import io.mobitech.search.services.SemanticService;
import io.mobitech.search_trends.services.SearchService;
import io.mobitech.search_trends.services.TrendsService;
import io.mobitech.search_trends.services.UClassifyService;
import io.mobitech.search_trends.services.schedulers.TrendsPeriodicUpdaterService;
import io.mobitech.search_trends.services.tasks.GetMasterTrendsTask;
import io.mobitech.search_trends.services.tasks.GrepwordsSimilarKeywordsTask;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import static com.github.aesteve.vertx.nubes.utils.async.AsyncUtils.*;
public class NubesVerticle extends AbstractVerticle {
private static final Logger LOG = LoggerFactory.getLogger(NubesVerticle.class);
protected HttpServer server;
protected HttpServerOptions options;
protected VertxNubes nubes;
@Override
public void init(Vertx vertx, Context context) {
super.init(vertx, context);
JsonObject config = context.config();
options = new HttpServerOptions();
options.setHost(config.getString("host", "localhost"));
options.setPort(config.getInteger("port", 9000));
nubes = new VertxNubes(vertx, config);
}
@Override
public void start(Future<Void> future) {
server = vertx.createHttpServer(options);
nubes.bootstrap(onSuccessOnly(future, router -> {
server.requestHandler(router::accept);
server.listen(ignoreResult(future));
LOG.info("Server listening on port : " + options.getPort());
}));
}
@Override
public void stop(Future<Void> future) {
nubes.stop(nubesRes -> closeServer(future));
}
private void closeServer(Future<Void> future) {
if (server != null) {
LOG.info("Closing HTTP server");
server.close(completeOrFail(future));
} else {
future.complete();
}
}
}
import com.github.aesteve.vertx.nubes.VertxNubes;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.runner.RunWith;
import java.util.concurrent.CountDownLatch;
@RunWith(VertxUnitRunner.class)
public abstract class VertxNubesTestBase {
protected static JsonObject config = new JsonObject();
protected static Vertx vertx;
@BeforeClass
public static void setUp(TestContext context) throws Exception {
if (vertx==null){
vertx = Vertx.vertx();
DeploymentOptions options = new DeploymentOptions();
config.put("host", "localhost");
config.put("port", 9002);
options.setConfig(config);
vertx.deployVerticle(NubesVerticle.class.getName(),options,
context.asyncAssertSuccess());
}
}
}
@thomsonimjd
Copy link
Author

thomsonimjd commented Nov 30, 2016

This code will produce the below output.

+--------+-------------------------------------------------+----------------+
19:28:38.539 [nioEventLoopGroup-3-1] INFO o.m.mockserver.MockServerHandler - creating expectation:

{
"httpRequest" : {
"method" : "GET",
"path" : "/hc"
},
"times" : {
"remainingTimes" : 0,
"unlimited" : true
},
"timeToLive" : {
"unlimited" : true
},
"httpResponse" : {
"statusCode" : 200,
"body" : "Server is running",
"delay" : {
"timeUnit" : "SECONDS",
"value" : 3
}
}
}

19:28:38.643 [nioEventLoopGroup-3-2] INFO o.m.matchers.HttpRequestMatcher - request:
matched expectation:
{
"method" : "GET",
"path" : "/hc"
}

GOT_EXCEPTION_2 => Connection pool reached max wait queue size of 0
GOT_EXCEPTION_3 => Connection pool reached max wait queue size of 0
GOT_EXCEPTION_1 => Connection was closed
GOT_EXCEPTION_0 => Connection was closed

java.util.concurrent.TimeoutException
at io.vertx.ext.unit.impl.TestContextImpl$Step.lambda$run$0(TestContextImpl.java:112)
at java.lang.Thread.run(Thread.java:745)

Process finished with exit code 255

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment