Skip to content

Instantly share code, notes, and snippets.

@mcherb
Last active April 14, 2020 14:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mcherb/181f1dd3d3c94afec9bf2d62e906e87d to your computer and use it in GitHub Desktop.
Save mcherb/181f1dd3d3c94afec9bf2d62e906e87d to your computer and use it in GitHub Desktop.
Test nested onNext call
package events;
import io.reactivex.Observable;
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Delegate;
import org.junit.Test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.collect.ImmutableSet.of;
public class NestedOnNextCallingTest {
@Test
public void simpleEvenService_should_success_when_direct_call() {
SimpleEventService ueService = new SimpleEventService();
Observable<String> obsA = ueService
.streaming()
.doOnNext(e -> System.out.println("Received on A : " + e + " : " + Thread.currentThread().getName()))
.map(e -> "obsA " + e.get());
Observable<String> obsB = ueService
.streaming()
.doOnNext(e -> System.out.println("==> Received on B : " + e + " : " + Thread.currentThread().getName()))
.map(e -> "obsB " + e.get());
TestObserver<String> events = Observable.merge(obsA, obsB).test();
ueService.push(new SomeEvent(1));
ueService.push(new SomeEvent(2));
ueService.push(new SomeEvent(3));
events.assertValues(
"obsA 1", "obsB 1",
"obsA 2", "obsB 2",
"obsA 3", "obsB 3"
);
}
@Test
public void simpleEvenService_should_fail_when_nested_call() {
SimpleEventService ueService = new SimpleEventService();
Observable<String> obsA = ueService
.streaming()
.takeWhile(e -> e.get() < 4)
.doOnNext(e -> System.out.println("Received on A : " + e + " : " + Thread.currentThread().getName()))
.doOnNext(e -> ueService.push(new SomeEvent(e.get() + 1)))
.map(e -> "obsA " + e.get());
Observable<String> obsB = ueService
.streaming()
.takeWhile(e -> e.get() < 4)
.doOnNext(e -> System.out.println("==> Received on B : " + e + " : " + Thread.currentThread().getName()))
.map(e -> "obsB " + e.get());
TestObserver<String> events = Observable.merge(obsA, obsB).test();
ueService.push(new SomeEvent(1));
events.assertNoValues();
}
@Test
public void simpleEventServiceOnNewThread_should_failOnOrder_when_direct_call() {
SimpleEventServiceOnNewThread ueService = new SimpleEventServiceOnNewThread();
Observable<String> obsA = ueService
.streaming()
.doOnNext(e -> System.out.println("Received on A : " + e + " : " + Thread.currentThread().getName()))
.map(e -> "obsA " + e.get());
Observable<String> obsB = ueService
.streaming()
.doOnNext(e -> System.out.println("==> Received on B : " + e + " : " + Thread.currentThread().getName()))
.map(e -> "obsB " + e.get());
TestObserver<String> events = Observable.merge(obsA, obsB).test();
ueService.push(new SomeEvent(1));
ueService.push(new SomeEvent(2));
ueService.push(new SomeEvent(3));
events
.awaitDone(1, TimeUnit.SECONDS)
.assertValueSet( // order not guaranteed
of("obsA 1", "obsB 1", "obsA 2", "obsB 2", "obsA 3", "obsB 3")
);
}
@Test
public void simpleEventServiceOnNewThread_failOnOrder_when_nested_call() {
SimpleEventServiceOnNewThread ueService = new SimpleEventServiceOnNewThread();
Observable<String> obsA = ueService
.streaming()
.takeWhile(e -> e.get() < 4)
.doOnNext(e -> System.out.println("Received on A : " + e + " : " + Thread.currentThread().getName()))
.doOnNext(e -> ueService.push(new SomeEvent(e.get() + 1)))
.map(e -> "obsA " + e.get());
Observable<String> obsB = ueService
.streaming()
.takeWhile(e -> e.get() < 4)
.doOnNext(e -> System.out.println("==> Received on B : " + e + " : " + Thread.currentThread().getName()))
.map(e -> "obsB " + e.get());
TestObserver<String> events = Observable.merge(obsA, obsB).test();
ueService.push(new SomeEvent(1));
events
.awaitDone(1, TimeUnit.SECONDS)
.assertValueSet( // order not guaranteed
of("obsA 1", "obsB 1", "obsA 2", "obsB 2", "obsA 3", "obsB 3")
);
}
@Test
public void eventService_should_success_when_direct_call() {
ExecutorService executor = Executors.newSingleThreadExecutor();
EventService ueService = new EventService(executor);
Observable<String> obsA = ueService
.streaming()
.doOnNext(e -> System.out.println("Received on A : " + e + " : " + Thread.currentThread().getName()))
.map(e -> "obsA " + e.get());
Observable<String> obsB = ueService
.streaming()
.doOnNext(e -> System.out.println("==> Received on B : " + e + " : " + Thread.currentThread().getName()))
.map(e -> "obsB " + e.get());
TestObserver<String> events = Observable.merge(obsA, obsB).test();
ueService.push(new SomeEvent(1));
ueService.push(new SomeEvent(2));
ueService.push(new SomeEvent(3));
events
.awaitDone(1, TimeUnit.SECONDS)
.assertValues(
"obsA 1", "obsB 1",
"obsA 2", "obsB 2",
"obsA 3", "obsB 3"
);
executor.shutdown();
}
@Test
public void eventService_should_success_when_nested_call() {
ExecutorService executor = Executors.newSingleThreadExecutor();
EventService ueService = new EventService(executor);
Observable<String> obsA = ueService
.streaming()
.takeWhile(e -> e.get() < 4)
.doOnNext(e -> System.out.println("Received on A : " + e + " : " + Thread.currentThread().getName()))
.doOnNext(e -> ueService.push(new SomeEvent(e.get() + 1)))
.map(e -> "obsA " + e.get());
Observable<String> obsB = ueService
.streaming()
.takeWhile(e -> e.get() < 4)
.doOnNext(e -> System.out.println("==> Received on B : " + e + " : " + Thread.currentThread().getName()))
.map(e -> "obsB " + e.get());
TestObserver<String> events = Observable.merge(obsA, obsB).test();
ueService.push(new SomeEvent(1));
events
.awaitDone(1, TimeUnit.SECONDS)
.assertValues(
"obsA 1", "obsB 1",
"obsA 2", "obsB 2",
"obsA 3", "obsB 3"
);
executor.shutdown();
}
public static class SimpleEventService {
private final Subject<SomeEvent> streaming = PublishSubject.create();
public Observable<SomeEvent> streaming() {
return streaming;
}
public void push(SomeEvent userEvent) {
streaming.onNext(userEvent);
}
}
public static class SimpleEventServiceOnNewThread {
private final Subject<SomeEvent> streaming = PublishSubject.create();
public Observable<SomeEvent> streaming() {
return streaming.observeOn(Schedulers.newThread());
}
public void push(SomeEvent userEvent) {
streaming.onNext(userEvent);
}
}
@RequiredArgsConstructor
public static class EventService {
private final Subject<SomeEvent> streaming = PublishSubject.create();
private final ExecutorService executor;
public Observable<SomeEvent> streaming() {
return streaming.toSerialized();
}
public void push(SomeEvent userEvent) {
executor.execute(() -> streaming.onNext(userEvent));
}
}
@RequiredArgsConstructor
static class SomeEvent {
@Delegate
AtomicInteger value;
SomeEvent(int value) {
this.value = new AtomicInteger(value);
}
@Override
public String toString() {
return "" + value.get();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment