Skip to content

Instantly share code, notes, and snippets.

@tuxdna
Created December 8, 2020 13:27
Show Gist options
  • Save tuxdna/9e5901e1e3b309494f70821c95e22bbf to your computer and use it in GitHub Desktop.
Save tuxdna/9e5901e1e3b309494f70821c95e22bbf to your computer and use it in GitHub Desktop.
RxFlowable wait for concurrent processing to finish
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.parallel.ParallelFlowable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import org.apache.commons.lang3.tuple.ImmutablePair;
import java.time.LocalTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class ReactiveTest {
public static <T> T intenseCalculation(T value) throws InterruptedException {
Thread.sleep(ThreadLocalRandom.current().nextInt(50));
return value;
}
public static void flow1() throws InterruptedException {
Flowable<Integer> vals = Flowable.range(1, 50);
ParallelFlowable<ImmutablePair<Integer, Double>> parallelFlowable = vals.parallel(6) //.parallel()
.runOn(Schedulers.io())
.map(i -> {
intenseCalculation(i);
out.println(Thread.currentThread().getName() + " Processing: " + i);
return new ImmutablePair<>(i, i * Math.random());
});
Flowable<ImmutablePair<Integer, Double>> seqFlow = parallelFlowable.sequential();
seqFlow
.observeOn(Schedulers.computation())
.blockingStream()
.forEach(out::println);
}
public static void main(String[] args) throws InterruptedException {
flow1();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment