Skip to content

Instantly share code, notes, and snippets.

@juanavelez
Created April 15, 2017 00:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save juanavelez/bf47ae84f842896350ca046a1d88ac5f to your computer and use it in GitHub Desktop.
Save juanavelez/bf47ae84f842896350ca046a1d88ac5f to your computer and use it in GitHub Desktop.
execute-blocking-code-multiple-queues
/**
* Similar to {@link #executeBlocking(Handler, Handler)} but when this method is called several times on the same
* context for the same {@code identifier}, executions associated to that value will be executed serially.
* However executions associated to different identifiers will be executed in parallel.
* @param identifier Object used to group and serialize executions
* @param blockingCodeHandler handler representing the blocking code to run
* @param resultHandler handler that will be called when the blocking code is complete
* @param <T> the type of the result
*/
<T> void executeBlocking(Object identifier, Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler);
protected final Map<Object, TaskQueue> orderedTaskQueues = new WeakHashMap<>();
@Override
public <T> void executeBlocking(Object identifier, Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler) {
TaskQueue queue = orderedTaskQueues.get(identifier);
if (queue == null) {
queue = new TaskQueue();
orderedTaskQueues.put(identifier, queue);
}
executeBlocking(blockingCodeHandler, queue, resultHandler);
}
@Test
public void testExecuteBlockingMultipleQueuesViaContextCall() throws Exception {
ContextInternal context = (ContextInternal) vertx.getOrCreateContext();
waitFor(4);
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(1);
CountDownLatch latch3 = new CountDownLatch(1);
CountDownLatch latch4 = new CountDownLatch(1);
String queue1 = "queue1";
String queue2 = "queue2";
context.executeBlocking(queue1, fut -> {
try {
awaitLatch(latch3);
latch1.countDown();
fut.complete();
} catch (Exception e) {
fut.fail(e);
}
}, ar -> {
assertTrue(ar.succeeded());
complete();
});
context.executeBlocking(queue1, fut -> {
try {
latch1.await();
latch2.countDown();
fut.complete();
} catch (Exception e) {
fut.fail(e);
}
}, ar -> {
assertTrue(ar.succeeded());
complete();
});
context.executeBlocking(queue2, fut -> {
latch3.countDown();
fut.complete();
}, ar -> {
assertTrue(ar.succeeded());
complete();
});
context.executeBlocking(queue2, fut -> {
try {
latch3.await();
latch4.countDown();
fut.complete();
} catch (Exception e) {
fut.fail(e);
}
}, ar -> {
assertTrue(ar.succeeded());
complete();
});
await();
}
@Test
public void testExecuteBlockingMultipleQueuesViaVertxCall() throws Exception {
waitFor(4);
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start() throws Exception {
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(1);
CountDownLatch latch3 = new CountDownLatch(1);
CountDownLatch latch4 = new CountDownLatch(1);
String queue1 = "queue1";
String queue2 = "queue2";
vertx.executeBlocking(queue1, fut -> {
try {
awaitLatch(latch3);
latch1.countDown();
fut.complete();
} catch (Exception e) {
fut.fail(e);
}
}, ar -> {
assertTrue(ar.succeeded());
complete();
});
vertx.executeBlocking(queue1, fut -> {
try {
latch1.await();
latch2.countDown();
fut.complete();
} catch (Exception e) {
fut.fail(e);
}
}, ar -> {
assertTrue(ar.succeeded());
complete();
});
vertx.executeBlocking(queue2, fut -> {
try {
latch3.countDown();
fut.complete();
} catch (Exception e) {
fut.fail(e);
}
}, ar -> {
assertTrue(ar.succeeded());
complete();
});
vertx.executeBlocking(queue2, fut -> {
try {
latch3.await();
latch4.countDown();
fut.complete();
} catch (Exception e) {
fut.fail(e);
}
}, ar -> {
assertTrue(ar.succeeded());
complete();
});
}
});
await();
}
/**
* Similar to {@link #executeBlocking(Handler, Handler)} but when this method is called several times on the same
* context for the same {@code identifier}, executions associated to that value will be executed serially.
* However executions associated to different identifiers will be executed in parallel.
* @param identifier Object used to group and serialize executions
* @param blockingCodeHandler handler representing the blocking code to run
* @param resultHandler handler that will be called when the blocking code is complete
* @param <T> the type of the result
*/
<T> void executeBlocking(Object identifier, Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler);
@Override
public <T> void executeBlocking(Object identifier,
Handler<Future<T>> blockingCodeHandler,
Handler<AsyncResult<T>> asyncResultHandler) {
ContextImpl context = getOrCreateContext();
context.executeBlocking(identifier, blockingCodeHandler, asyncResultHandler);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment