Skip to content

Instantly share code, notes, and snippets.

@nipunarora
Created September 6, 2021 03:59
Show Gist options
  • Save nipunarora/a9bd3a3867fea61a6b227144067e045c to your computer and use it in GitHub Desktop.
Save nipunarora/a9bd3a3867fea61a6b227144067e045c to your computer and use it in GitHub Desktop.
An example of how to do a fire and forget pattern in java react
public Flux<Integer> testCacheAsync() {
// flux of 1-10 integers with a delay of 100millis between each element
Flux<Integer> integerFlux = Flux.range(1, 10).delayElements(Duration.ofMillis(200));
/**
* A forget and fire pattern event here. which is executed after collectList() i.e. after the
* flux stream has it's last event collected. After which you can run a blocking task on
* collected list. This is essentially another subscriber on integerFlux
*/
integerFlux
.collectList()
.doOnNext(
data ->
Mono.fromRunnable(() -> blockingTask(data)).subscribeOn(TEST_SCHEDULER).subscribe())
.subscribe();
/**
* Showcasing buffer which is similar to collectList(), but collect with max size
*/
integerFlux
.buffer(2)
.doOnNext(
data ->
Mono.fromRunnable(() -> blockingBufferTask(data))
.subscribeOn(TEST_SCHEDULER)
.subscribe())
.subscribe();
// return integer flux which can have other subscribers in the response path
return integerFlux;
}
@martin-tarjanyi
Copy link

martin-tarjanyi commented Sep 6, 2021

Replace this:
Flux<Integer> integerFlux = Flux.range(1, 10).delayElements(Duration.ofMillis(200));
with this:
Flux<Integer> integerFlux = Flux.range(1, 10).doOnNext(System.out::println).delayElements(Duration.ofMillis(200));

You'll see that each number is printed twice (or three times if the returned Flux is also subscribed) as you have two subscribe calls.

That's why you might need caching.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment