Skip to content

Instantly share code, notes, and snippets.

@mwmitchell
Last active June 17, 2020 21:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mwmitchell/ac14f8ebaf708f0869e0dff9265b1521 to your computer and use it in GitHub Desktop.
Save mwmitchell/ac14f8ebaf708f0869e0dff9265b1521 to your computer and use it in GitHub Desktop.
package com.test;
import com.google.common.base.Stopwatch;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class ReactorMonoToFluxWaitExample {
public static void main(String[] args) throws InterruptedException {
// A function that accepts an initial input, and returns a Flux<Object>
Function<String, Flux<Object>> replies = (input) -> Flux.create(sink -> {
Thread t = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
// Simulate work...
int sleepTime = ThreadLocalRandom.current().nextInt(100, 1000);
Thread.sleep(sleepTime);
// Finally, emit a value
sink.next(input + " -> " + sleepTime);
} catch (InterruptedException interruptedException) {
// This thread has been interrupted... which means the subscription to the Flux has completed.
System.out.println("Generator thread interrupted...");
Thread.currentThread().interrupt();
break;
}
}
});
sink.onRequest(new LongConsumer() {
@Override
public void accept(long value) {
System.out.println("onRequest... request is for " + value + " items, but we'll generate until onCancel / onDispose");
}
});
sink.onCancel(new Disposable() {
@Override
public void dispose() {
System.out.println("sink.onCancel called");
// When the subscription is cancelled, interrupt the generator thread above
t.interrupt();
}
});
sink.onDispose(new Disposable() {
@Override
public void dispose() {
System.out.println("sink.onDispose called");
// When the subscription is completed, interrupt the generator thread above
t.interrupt();
}
});
// Start the generator thread
t.start();
});
Stopwatch stopwatch = Stopwatch.createUnstarted();
// Single input:
List<Object> results = Mono.just("hi")
// We want many replies:
.flatMapMany(replies)
// Only request n items, this is arbitrary
.limitRequest(6)
// and wait only for n seconds
.take(Duration.ofSeconds(5))
// Log some output for each item received
.doOnNext(value -> System.out.println("doOnNext: " + value + ", stopwatch.elapsed -> " + stopwatch.elapsed(TimeUnit.MILLISECONDS)))
// Map to a list of results (should be <= what was passed into limitRequest(...))
.collectList()
// Start the stopwatch when subscribed to
.doOnSubscribe(s -> stopwatch.start())
// Log everything (reactor debugging utility)
.log()
// Now wait for n items to be produced, or n seconds have passed
.block();
System.err.println("Done in " + stopwatch.stop().elapsed(TimeUnit.MILLISECONDS) + " ms. Got " + results.size() + " results: " + results);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment