Skip to content

Instantly share code, notes, and snippets.

@vietj
Created February 21, 2024 15:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vietj/5038949402865df678464040335408b8 to your computer and use it in GitHub Desktop.
Save vietj/5038949402865df678464040335408b8 to your computer and use it in GitHub Desktop.
@Test
public void testPipeliningOrder() throws Exception {
client.close();
client = vertx.createHttpClient(createBaseClientOptions()
.setKeepAlive(true)
.setMaxPoolSize(1)
);
AtomicInteger reqCount = new AtomicInteger(0);
server.connectionHandler(conn -> {
conn.exceptionHandler(err -> {
log.accept("CONNECTION ERROR " + err.getMessage());
});
conn.closeHandler(v -> {
log.accept("CONNECTION CLOSED");
});
});
server.requestHandler(req -> {
assertSame(Vertx.currentContext(), ((HttpServerRequestInternal) req).context());
int theCount = reqCount.get();
assertEquals(theCount, Integer.parseInt(req.headers().get("count")));
reqCount.incrementAndGet();
// req.response().setChunked(true);
log.accept("Got request " + theCount);
req.body()
.onComplete(onSuccess(buff -> {
log.accept("Got body " + theCount);
assertEquals("This is content " + theCount, buff.toString());
// We write the response back after a random time to increase the chances of responses written in the
// wrong order if we didn't implement pipelining correctly
vertx.setTimer(1 + (long) (10 * Math.random()), id -> {
log.accept("Send response " + theCount);
req.response().headers().set("count", String.valueOf(theCount));
req.response().end(buff).onComplete(onSuccess(v -> {
log.accept("Sent response " + theCount);
}));
});
}));
});
CountDownLatch latch = new CountDownLatch(requests);
server.listen(testAddress, onSuccess(s -> {
doRequest(latch);
}));
try {
awaitLatch(latch);
} catch (Throwable e) {
log.accept("Timeout");
throw e;
}
}
int requests = 100;
long now = System.currentTimeMillis();
Consumer<String> log = msg -> {
long time = (System.currentTimeMillis() - now) / 10;
System.out.println(msg + " (" + time / 100F + ")");
};
AtomicInteger count = new AtomicInteger();
private void doRequest(CountDownLatch latch) {
int theCount = count.getAndIncrement();
if (theCount >= requests) {
return;
}
log.accept("Want request " + theCount);
RequestOptions options = new RequestOptions(requestOptions).setMethod(PUT);
client.request(options)
.onComplete(onSuccess(req -> {
req.putHeader("count", String.valueOf(theCount));
req.send(Buffer.buffer("This is content " + theCount), onSuccess(resp -> {
log.accept("Got response " + theCount);
assertEquals(theCount, Integer.parseInt(resp.headers().get("count")));
resp.body().onComplete(onSuccess(buff -> {
log.accept("Got response body " + theCount);
assertEquals("This is content " + theCount, buff.toString());
latch.countDown();
doRequest(latch);
}));
}));
}));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment