Skip to content

Instantly share code, notes, and snippets.

@dazraf
Last active December 21, 2015 18:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dazraf/60001a1a12bcbc367f18 to your computer and use it in GitHub Desktop.
Save dazraf/60001a1a12bcbc367f18 to your computer and use it in GitHub Desktop.
composing async calls
package io.dazraf.vertx.async;
import io.vertx.core.AsyncResult;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonObject;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
import static io.dazraf.vertx.async.promise.Promise.all;
import static io.dazraf.vertx.async.promise.Promise.create;
public class PromiseTest {
private static final String ADDRESS = "address";
private static final Logger LOGGER = LoggerFactory.getLogger(PromiseTest.class);
private Vertx vertx = Vertx.vertx();
@Test
public void test() throws InterruptedException {
// let's register for some message handling off the event bus
vertx.eventBus().consumer(ADDRESS, (Message<JsonObject> m) -> {
LOGGER.info("consumer received: {}", m.body().encodePrettily());
m.reply(m.body());
});
// a latch to pause the test whilst the async executions are in progress
final CountDownLatch latch = new CountDownLatch(1);
// This is using a new API I wrote encapsulate in the Promise interface
// the interface has static factory methods for creation: create, all
// a promise can be chained with member factory methods: then, thenAll
// it can also be observed: peek
// and can be evaluated with: eval
// e.g.
// This examples runs submissions to the eventbus (althought other types of async ops are possible also)
// We execute EventBus#send of JsonObject with 'id' field set to an ordinal
// first "tasks" '1' and '2' in parallel, followed by task '3', then tasks '4' and '5' in parallel
// here we go ..
// lets run taks '1', '2' in parallel
all(
// the general form is: async return type, function to call, parameters
create(JsonObject.class, vertx.eventBus()::send, ADDRESS, createJsonObjectWithId(1)).peek(this::writeResponse),
// ... note that we also can chain a 'peek' function to intercept the callbacks - similar to Java8 Streams
create(JsonObject.class, vertx.eventBus()::send, ADDRESS, createJsonObjectWithId(2)).peek(this::writeResponse)
)
// here we wait for '1' and '2' to finish before running '3'
.then(JsonObject.class, vertx.eventBus()::send, ADDRESS, createJsonObjectWithId(3))
.thenAll(
// here we execute in parallel again, this time for '4' and '5'
create(JsonObject.class, vertx.eventBus()::send, ADDRESS, createJsonObjectWithId(4)).peek(this::writeResponse),
create(JsonObject.class, vertx.eventBus()::send, ADDRESS, createJsonObjectWithId(5)).peek(this::writeResponse)
)
// we now execute the entire built pipeline, capturing the state of the last task
.eval(ar -> {
latch.countDown();
LOGGER.info("all done");
});
// let's wait for everythign to finish
latch.await();
}
private void writeResponse(AsyncResult<Message<JsonObject>> asyncResult) {
LOGGER.info("success: {} value: {}", asyncResult.succeeded(), asyncResult.succeeded() ? asyncResult.result().body().encodePrettily() : "null");
}
private JsonObject createJsonObjectWithId(int id) {
return new JsonObject().put("id", id);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment