Skip to content

Instantly share code, notes, and snippets.

@jenzz
Last active October 13, 2023 10:33
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save jenzz/ae8fe8643e47f431f91712499ef8c4f9 to your computer and use it in GitHub Desktop.
Save jenzz/ae8fe8643e47f431f91712499ef8c4f9 to your computer and use it in GitHub Desktop.
Proof of concept for RxJava Tidbits #4: Combining multiple data sources (https://medium.com/rxjava-tidbits/rxjava-tidbits-4-combining-multiple-data-sources-f921c1ad5358)
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Scheduler;
import rx.functions.Func1;
import rx.observers.AssertableSubscriber;
import rx.schedulers.TestScheduler;
public class MultipleDataSourcesTest {
enum Source {
LOCAL, REMOTE
}
interface Repository {
Observable<Source> getData();
}
static class DelayedRepository implements Repository {
private final Source source;
private final long delay;
private final Scheduler scheduler;
DelayedRepository(Source source, long delay, Scheduler scheduler) {
this.source = source;
this.delay = delay;
this.scheduler = scheduler;
}
@Override
public Observable<Source> getData() {
return Observable.just(source).delay(delay, TimeUnit.SECONDS, scheduler);
}
}
static class CompositeRepository implements Repository {
private final Repository localRepository;
private final Repository remoteRepository;
CompositeRepository(Repository localRepository, Repository remoteRepository) {
this.localRepository = localRepository;
this.remoteRepository = remoteRepository;
}
@Override
public Observable<Source> getData() {
return remoteRepository.getData()
.publish(new Func1<Observable<Source>, Observable<Source>>() {
@Override
public Observable<Source> call(Observable<Source> remoteObservable) {
return remoteObservable.mergeWith(
localRepository.getData().takeUntil(remoteObservable)
);
}
});
}
}
@Test
public void emitsFastLocalDataSourceThenSlowRemoteDataSource() {
TestScheduler scheduler = new TestScheduler();
Repository localRepository = new DelayedRepository(Source.LOCAL, 1, scheduler);
Repository remoteRepository = new DelayedRepository(Source.REMOTE, 3, scheduler);
Repository sut = new CompositeRepository(localRepository, remoteRepository);
AssertableSubscriber<Source> subscriber = sut.getData().test();
// scheduler at 0, i.e. no emitted values and subscription still alive
subscriber.assertNoValues().assertNoTerminalEvent();
scheduler.advanceTimeTo(1, TimeUnit.SECONDS);
subscriber.assertValuesAndClear(Source.LOCAL) // fast local data source emits first
.assertNotCompleted(); // subscription is still alive
scheduler.advanceTimeTo(3, TimeUnit.SECONDS);
subscriber.assertValue(Source.REMOTE) // slow remote data source
.assertCompleted(); // subscription completed
}
@Test
public void fastRemoteDataSourceSupersedesSlowLocalDataSource() {
TestScheduler scheduler = new TestScheduler();
Repository localRepository = new DelayedRepository(Source.LOCAL, 3, scheduler);
Repository remoteRepository = new DelayedRepository(Source.REMOTE, 1, scheduler);
Repository sut = new CompositeRepository(localRepository, remoteRepository);
AssertableSubscriber<Source> subscriber = sut.getData().test();
// scheduler at 0, i.e. no emitted values and subscription still alive
subscriber.assertNoValues().assertNoTerminalEvent();
scheduler.advanceTimeTo(1, TimeUnit.SECONDS);
subscriber.assertValuesAndClear(Source.REMOTE) // fast remote data source emits first
.assertCompleted(); // subscription completed as it supersedes slow local data source
scheduler.advanceTimeTo(3, TimeUnit.SECONDS);
subscriber.assertNoValues(); // redundant assertion as subscription has already completed
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment