Last active
December 21, 2015 18:16
-
-
Save dazraf/60001a1a12bcbc367f18 to your computer and use it in GitHub Desktop.
composing async calls
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.dazraf.vertx.async; | |
import io.vertx.core.AsyncResult; | |
import io.vertx.core.Vertx; | |
import io.vertx.core.eventbus.Message; | |
import io.vertx.core.json.JsonObject; | |
import org.junit.Test; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.util.concurrent.CountDownLatch; | |
import static io.dazraf.vertx.async.promise.Promise.all; | |
import static io.dazraf.vertx.async.promise.Promise.create; | |
public class PromiseTest { | |
private static final String ADDRESS = "address"; | |
private static final Logger LOGGER = LoggerFactory.getLogger(PromiseTest.class); | |
private Vertx vertx = Vertx.vertx(); | |
@Test | |
public void test() throws InterruptedException { | |
// let's register for some message handling off the event bus | |
vertx.eventBus().consumer(ADDRESS, (Message<JsonObject> m) -> { | |
LOGGER.info("consumer received: {}", m.body().encodePrettily()); | |
m.reply(m.body()); | |
}); | |
// a latch to pause the test whilst the async executions are in progress | |
final CountDownLatch latch = new CountDownLatch(1); | |
// This is using a new API I wrote encapsulate in the Promise interface | |
// the interface has static factory methods for creation: create, all | |
// a promise can be chained with member factory methods: then, thenAll | |
// it can also be observed: peek | |
// and can be evaluated with: eval | |
// e.g. | |
// This examples runs submissions to the eventbus (althought other types of async ops are possible also) | |
// We execute EventBus#send of JsonObject with 'id' field set to an ordinal | |
// first "tasks" '1' and '2' in parallel, followed by task '3', then tasks '4' and '5' in parallel | |
// here we go .. | |
// lets run taks '1', '2' in parallel | |
all( | |
// the general form is: async return type, function to call, parameters | |
create(JsonObject.class, vertx.eventBus()::send, ADDRESS, createJsonObjectWithId(1)).peek(this::writeResponse), | |
// ... note that we also can chain a 'peek' function to intercept the callbacks - similar to Java8 Streams | |
create(JsonObject.class, vertx.eventBus()::send, ADDRESS, createJsonObjectWithId(2)).peek(this::writeResponse) | |
) | |
// here we wait for '1' and '2' to finish before running '3' | |
.then(JsonObject.class, vertx.eventBus()::send, ADDRESS, createJsonObjectWithId(3)) | |
.thenAll( | |
// here we execute in parallel again, this time for '4' and '5' | |
create(JsonObject.class, vertx.eventBus()::send, ADDRESS, createJsonObjectWithId(4)).peek(this::writeResponse), | |
create(JsonObject.class, vertx.eventBus()::send, ADDRESS, createJsonObjectWithId(5)).peek(this::writeResponse) | |
) | |
// we now execute the entire built pipeline, capturing the state of the last task | |
.eval(ar -> { | |
latch.countDown(); | |
LOGGER.info("all done"); | |
}); | |
// let's wait for everythign to finish | |
latch.await(); | |
} | |
private void writeResponse(AsyncResult<Message<JsonObject>> asyncResult) { | |
LOGGER.info("success: {} value: {}", asyncResult.succeeded(), asyncResult.succeeded() ? asyncResult.result().body().encodePrettily() : "null"); | |
} | |
private JsonObject createJsonObjectWithId(int id) { | |
return new JsonObject().put("id", id); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment