Skip to content

Instantly share code, notes, and snippets.

@akarnokd
Created March 5, 2021 10:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save akarnokd/b9177673e9c28418a119cf1215e93521 to your computer and use it in GitHub Desktop.
Save akarnokd/b9177673e9c28418a119cf1215e93521 to your computer and use it in GitHub Desktop.
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.observers.TestObserver;
import io.reactivex.rxjava3.schedulers.TestScheduler;
public class RepeatWhenSO {
@Test
public void test() {
TestScheduler testScheduler = new TestScheduler();
AtomicInteger counter = new AtomicInteger();
TestObserver<String> to = Observable.<String>fromCallable(() -> {
if (counter.getAndIncrement() == 0) {
return "PENDING";
}
return "ELIGIBLE";
})
.repeatWhen(o -> o.delay(5, TimeUnit.SECONDS, testScheduler))
.takeUntil (item -> !item.equals("PENDING"))
.doOnNext(item -> System.out.println("doOnNext called"))
.lastElement()
.toSingle()
.test();
to.assertEmpty();
testScheduler.advanceTimeBy(5, TimeUnit.SECONDS);
to.assertResult("ELIGIBLE");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment