Skip to content

Instantly share code, notes, and snippets.

@smaldini
Forked from jbrisbin/BroadcastStream.java
Last active June 10, 2016 16:44
Show Gist options
  • Save smaldini/a537f7a1f6b77d59b5d4 to your computer and use it in GitHub Desktop.
Save smaldini/a537f7a1f6b77d59b5d4 to your computer and use it in GitHub Desktop.
Scatter gather Async Flux example using #ProjectReactor
// Create an async message-passing Processor exposing a Flux API
TopicProcessor<String> sink = TopicProcessor.create();
// Scatter Gather the input sequence
sink
.map(String::toUpperCase)
.flatMap(s ->
Mono.fromCallable(() -> someRepository.findOneByCategory(s))
.timeout(Duration.ofSeconds(3), someRepository::fallback)
.subscribeOn(Schedulers.parallel())
)
.subscribe(System.out::println);
// Sink values asynchronously
sink.onNext("Rx");
sink.onNext("ReactiveStreams");
sink.onNext("ReactiveStreamsCommons");
sink.onNext("RingBuffer");
//Shutdown and clean async resources
sink.onComplete();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment