Skip to content

Instantly share code, notes, and snippets.

@simbo1905
Last active November 5, 2019 12:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save simbo1905/21640998cd27369e44be8a282dc9f13f to your computer and use it in GitHub Desktop.
Save simbo1905/21640998cd27369e44be8a282dc9f13f to your computer and use it in GitHub Desktop.
out of order processing of a Flux of input using Reactor 0.9.1.RELEASE
import reactor.core.publisher.Mono;
public class BlockingJournal {
private static String blockingWrite(String in){
try {
// fakes blocking for disk write
Thread.sleep(5L);
System.out.println("journal wrote: "+in+" on "+Thread.currentThread().getName());
} catch (Exception e){
throw new RuntimeException(e);
}
return in;
}
public static Mono<String> blockingMethodSingleThread(final String in) {
return Mono.fromSupplier(() -> blockingWrite(in));
}
}
import reactor.core.publisher.Mono;
import java.util.Random;
public class BlockingRemoteCall {
private final static Random r = new Random();
static private String blockingWebService(final String in) {
try {
// fakes blocking for up to a second
Thread.sleep((long) (1000 * r.nextFloat()));
System.out.println("webserver returned: "+in+" on "+Thread.currentThread().getName());
} catch (Exception e) {
throw new RuntimeException(e);
}
return in;
}
public static Mono<String> blockingMethodParallelThread(final String in) {
return Mono.fromSupplier(() -> blockingWebService(in));
}
}
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ParallelFlux;
import reactor.core.scheduler.Schedulers;
public class TestFluxes {
@Test
public void testFluxParallelProcess() throws Exception {
// Solution by Michael Berry https://stackoverflow.com/a/58709188/329496 Thanks!
ParallelFlux<String> flux = Flux.range(1, 10).map(i -> i.toString()).parallel().runOn(Schedulers.elastic());
ParallelFlux<String> pipeline = flux.flatMap(s -> {
Mono<String> async = BlockingRemoteCall.blockingMethodParallelThread(s);
String r1 = async.block();
return BlockingJournal.blockingMethodSingleThread(r1);
});
pipeline.sequential().doOnNext(System.out::println).blockLast();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment