Skip to content

Instantly share code, notes, and snippets.

Vertx vertx = Vertx.vertx();
HttpClient client = vertx.createHttpClient();
HttpServer server = vertx.createHttpServer();
server.requestHandler(req -> {
Trace trace = Trace.create(req);
HttpClientRequest clientReq = client.get("some-uri", ar -> {
trace.close();
});
trace.propagateTo(clientReq);
Found one Java-level deadlock:
=============================
"vert.x-eventloop-thread-3":
SUREFIRE-859: waiting to lock monitor 0x00007f32040062c8 (object 0x00000000e7e52ea0, a io.mewbase.server.impl.log.LogImpl),
which is held by "vert.x-eventloop-thread-2"
"vert.x-eventloop-thread-2":
SUREFIRE-859: waiting to lock monitor 0x00007f3204006218 (object 0x00000000ebacde70, a io.mewbase.server.impl.log.LogReadStreamImpl),
which is held by "vert.x-eventloop-thread-3"
Java stack information for the threads listed above:
===================================================
@Test
@Repeat(times=1000)
public void testAsyncFileConcurrency() throws Exception {
String fileName = "some-file.dat";
AtomicReference<AsyncFile> arFile = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
vertx.fileSystem().open(testDir + pathSep + fileName, new OpenOptions(), ar -> {
if (ar.succeeded()) {
AsyncFile af = ar.result();
diff --git a/src/main/java/io/vertx/core/file/impl/AsyncFileImpl.java b/src/main/java/io/vertx/core/file/impl/AsyncFileImpl.java
index c9f8992..301bde9 100644
--- a/src/main/java/io/vertx/core/file/impl/AsyncFileImpl.java
+++ b/src/main/java/io/vertx/core/file/impl/AsyncFileImpl.java
@@ -43,6 +43,7 @@ import java.util.HashSet;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
public void testFoo() {
Vertx vertx = Vertx.vertx();
HttpClient client = vertx.createHttpClient();
getWithFuture(client, "url1")
.thenCompose(resp1 -> getWithFuture(client, "url2/" + resp1.statusMessage()))
.thenCompose(resp2 -> getWithFuture(client, "url3/" + resp2.statusMessage()))
.whenComplete(((resp3, t) -> {
if (t != null) {
vertx.executeBlocking(fut -> {
Thread worker = Thread.currentThread();
AtomicBoolean complete = new AtomicBoolean();
long id = vertx.setTimer(TIMEOUT, tid -> {
if (!complete.get()) {
worker.interrupt();
}
});
private void doIt() {
CompletableFuture<String> cf = execute(this::findBook);
// Now you have a CompletableFuture you can use it like any CompletableFuture
cf.whenComplete((s,t)-> {
// CF callbacks will be always be called on the correct Vert.x thread, no need to worry about
// fork-join pools or anything like that
});
CompletableFuture<String> cf = execute(this::findBook);
// Now you have a CompletableFuture you can use it like any CompletableFuture
cf.whenComplete((s, t) -> {
// CF callbacks will be always be called on the correct Vert.x thread, no need to worry about
// fork-join pools or anything like that
});
// Some arbitrary blocking method which returns a String
AsyncResCF<String> ar = new AsyncResCF<>();
vertx.executeBlocking(fut -> {
// Your blocking code in here
}, ar);
ar.whenComplete((s, t) -> {
// CF callbacks will be always be called on the correct Vert.x thread, no need to worry about
// fork-join pools or anything like that
});
void someFunction(RoutingContext context) {
dbConnection.doSomething(blah, result -> {
// context is available in this lambda even though it's executed async
callSomeOtherfunction(context);
});