Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Example of Hot/Cold producer when you toggle refCount/autoConnect
public static void main(String[] args) throws InterruptedException {
final Flux<Long> longFlux = Flux.interval(Duration.ofSeconds(1))
.doOnSubscribe(e -> log.info("SUBSCRIBE"))
.doOnTerminate(() -> log.info("TERMINATE"))
.doOnComplete(() -> log.info("COMPLETE"))
.doOnCancel(() -> log.info("CANCEL"))
.doOnNext(s -> log.info("NEXT {}", s))
.replay()
.refCount(2, Duration.ofSeconds(1));
longFlux
.take(5)
.subscribe(
s -> log.info("onNext1 {}", s),
s -> log.info("onError1 {}", s),
() -> log.info("onComplete1")
);
longFlux
.take(10)
.subscribe(
s -> log.info("onNext2 {}", s),
s -> log.info("onError2 {}", s),
() -> log.info("onComplete2")
);
Thread.sleep(1000 * 20);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.