Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
package de.cronos.mad.messages.backend;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Mono;
public class RepeatTest {
private static class TestSubject {
EmitterProcessor<Boolean> shutdown = EmitterProcessor.create();
StringBuffer sb = new StringBuffer();
public void logMonoValues(Supplier<Mono<String>> monoSupplier) {
Mono.defer(monoSupplier).repeat().takeUntilOther(shutdown).subscribe(sb::append);
}
}
@Test
public void repeatTest() {
LinkedBlockingQueue<String> data = new LinkedBlockingQueue<>(List.of("Hello", "World"));
TestSubject testSubject = new TestSubject();
testSubject.logMonoValues(() -> {
if (data.isEmpty()) {
testSubject.shutdown.onNext(true);
return Mono.empty();
}
return Mono.just(data.remove());
});
Assertions.assertEquals("HelloWorld", testSubject.sb.toString());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment