Skip to content

Instantly share code, notes, and snippets.

@kentyeh
Last active August 13, 2021 05:50
Show Gist options
  • Save kentyeh/6925e047bd49d31286aa8d70fcb130d5 to your computer and use it in GitHub Desktop.
Save kentyeh/6925e047bd49d31286aa8d70fcb130d5 to your computer and use it in GitHub Desktop.
Test Reactor Async threading
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class ReactorMain {
public static void main(String[] args) throws InterruptedException {
int cores = Runtime.getRuntime().availableProcessors();
Instant start, end;
String[] classes = {
"/java/lang/InterruptedException.class",
"/java/lang/Runtime.class",
"/java/lang/System.class",
"/java/time/Duration.class",
"/java/time/Instant.class",
"/java/util/concurrent/CountDownLatch.class",
"/java/util/concurrent/ExecutorService.class",
"/java/util/concurrent/Executors.class",
"/java/util/concurrent/atomic/AtomicInteger.class",
"/reactor/core/publisher/Flux.class",
"/reactor/core/publisher/Mono.class",
"/reactor/core/scheduler/Scheduler.class"};
ExecutorService es = Executors.newFixedThreadPool(cores > classes.length ? classes.length : cores);
CountDownLatch latchTraditional = new CountDownLatch(classes.length);
start = Instant.now();
for (String file : classes) {
es.execute(() -> {
blockingRead(file);
latchTraditional.countDown();
});
}
latchTraditional.await();
es.shutdown();
end = Instant.now();
System.out.println("\033[91m>>>>>>>>> async read by traditional cost mills:" + Duration.between(start, end).toMillis() + "\033[0m");
CountDownLatch latchParallelFlux = new CountDownLatch(classes.length);
AtomicInteger atoi = new AtomicInteger(classes.length);
start = Instant.now();
Flux.<Integer>generate(sink -> {
int idx = atoi.getAndDecrement();
if (idx > 0) {
sink.next(idx);
} else {
sink.complete();
}
})
.parallel().runOn(Schedulers.boundedElastic())
// .subscribeOn(schedulers)
.subscribe((idx) -> {
blockingRead(classes[classes.length - idx]);
latchParallelFlux.countDown();
});
latchParallelFlux.await();
end = Instant.now();
System.out.println("\033[91m>>>>>>>>> async read by parallel flux cost mills:" + Duration.between(start, end).toMillis() + "\033[0m");
start = Instant.now();
CountDownLatch latchMultiMonos = new CountDownLatch(classes.length);
for (String file : classes) {
Mono.fromRunnable(() -> blockingRead(file))
.subscribeOn(Schedulers.boundedElastic())
.doOnTerminate(() -> latchMultiMonos.countDown())
.subscribe();
}
latchMultiMonos.await();
end = Instant.now();
System.out.println("\033[91m>>>>>>>>> async read by multi Mono cost mills:" + Duration.between(start, end).toMillis() + "\033[0m");
CountDownLatch latchFluxRepeatParallel = new CountDownLatch(classes.length);
start = Instant.now();
Flux.just(1)
.repeat(classes.length - 1)
.parallel()
.runOn(Schedulers.boundedElastic())
.doOnNext(i -> blockingRead(classes[Math.toIntExact(latchFluxRepeatParallel.getCount()) - 1]))
.sequential()//insignificant
.subscribe(i -> latchFluxRepeatParallel.countDown());
latchFluxRepeatParallel.await();
end = Instant.now();
System.out.println("\033[91m>>>>>>>>> async read by Flux repeat parallel cost mills:" + Duration.between(start, end).toMillis() + "\033[0m");
CountDownLatch latchFlatMap = new CountDownLatch(classes.length);
start = Instant.now();
Flux.just(1)
.repeat(classes.length - 1)
.flatMap(i -> Mono.fromCallable(() -> {
blockingRead(classes[Math.toIntExact(latchFlatMap.getCount()) - 1]);
return latchFlatMap.getCount();
}).subscribeOn(Schedulers.boundedElastic()), classes.length > cores ? cores : classes.length)
.subscribe(i -> latchFlatMap.countDown());
latchFlatMap.await();
end = Instant.now();
System.out.println("\033[91m>>>>>>>>> async read by Flux FlatMap cost mills:" + Duration.between(start, end).toMillis() + "\033[0m");
}
private static boolean blockingRead(String className) {
try (InputStream in = Main2.class.getResourceAsStream(className)) {
int i = -1;
while ((i = in.read()) != -1) {
}
System.out.println("\033[93m[" + Thread.currentThread().getName() + "]\033[92m"
+ className.substring(1).replaceFirst("\\.class", "").replaceAll("\\/", ".") + " \033[0mreaded.");
Thread.sleep(2000);
return true;
} catch (IOException | InterruptedException e) {
return false;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment