Skip to content

Instantly share code, notes, and snippets.

@akaliacius
Last active March 16, 2022 14:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save akaliacius/1da6638f5df21fe83c13d055dd9ee679 to your computer and use it in GitHub Desktop.
Save akaliacius/1da6638f5df21fe83c13d055dd9ee679 to your computer and use it in GitHub Desktop.
///usr/bin/env jbang "$0" "$@" ; exit $?
//DEPS io.vertx:vertx-rx-java3:4.2.5
//DEPS io.vertx:vertx-web-client:4.2.5
import io.reactivex.rxjava3.core.Single;
import io.vertx.rxjava3.core.Vertx;
import io.vertx.rxjava3.ext.web.client.HttpResponse;
import io.vertx.rxjava3.ext.web.client.WebClient;
import java.util.AbstractMap;
import java.util.concurrent.CountDownLatch;
import static java.lang.String.format;
import static java.lang.System.out;
public class Services {
public static void main(String... args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
// Vertx instance and web client
Vertx vertx = Vertx.vertx();
WebClient webClient = WebClient.create(vertx);
// single sources. Lazy evaluation, no invocation at this point
Single<Integer> service1Code = service1(webClient);
Single<Integer> service2Code = service2(webClient);
// combine results together and create tuple
Single<AbstractMap.SimpleEntry<Integer, Integer>> tupleSource =
service1Code.zipWith(service2Code, (s1, s2) -> new AbstractMap.SimpleEntry<>(s1, s2));
// subscribe and invoke services
tupleSource
.doFinally(countDownLatch::countDown)
.subscribe(Services::printResult);
vertx.setPeriodic(1000, l -> out.println("[" + Thread.currentThread().getName() + "] is released"));
countDownLatch.await();
vertx.rxClose().subscribe();
}
private static Single<Integer> service1(WebClient webClient) {
return webClient.getAbs("http://httpstat.us/200?sleep=5000")
.rxSend()
.doOnSuccess(response -> out.println("[" + Thread.currentThread().getName() + "] service 1: response received"))
.map(HttpResponse::statusCode);
}
private static Single<Integer> service2(WebClient webClient) {
return webClient.getAbs("http://httpstat.us/200?sleep=2000")
.rxSend()
.doOnSuccess(response -> out.println("[" + Thread.currentThread().getName() + "] service 2 response received"))
.map(HttpResponse::statusCode);
}
private static void printResult(AbstractMap.SimpleEntry<Integer, Integer> tuple){
out.println(format(
"[%s] Result: service1:%d service2:%d",
Thread.currentThread().getName(),
tuple.getKey(),
tuple.getValue()
));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment