Skip to content

Instantly share code, notes, and snippets.

@mr-archano
Created October 6, 2016 17:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mr-archano/27d550618605031c287e46a7b9d0bf2f to your computer and use it in GitHub Desktop.
Save mr-archano/27d550618605031c287e46a7b9d0bf2f to your computer and use it in GitHub Desktop.
import java.util.concurrent.atomic.AtomicReference;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func0;
class CachingObservableReplayer<T> implements Observable.Transformer<T, T> {
private final AtomicReference<Observable<T>> pendingObservable;
CachingObservableReplayer() {
this.pendingObservable = new AtomicReference<>(null);
}
@Override
public Observable<T> call(final Observable<T> source) {
return Observable.defer(new Func0<Observable<T>>() {
@Override
public Observable<T> call() {
Observable<T> sharable = source
.doOnError(clearPending())
.replay()
.autoConnect();
if (pendingObservable.compareAndSet(null, sharable)) {
return sharable;
}
return pendingObservable.get();
}
});
}
private Action1<Throwable> clearPending() {
return new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
pendingObservable.set(null);
}
};
}
}
import java.util.Random;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import rx.Observable;
import rx.Observer;
import rx.functions.Action0;
import rx.functions.Func0;
import rx.subjects.PublishSubject;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@RunWith(MockitoJUnitRunner.class)
public class CachingObservableReplayerTest {
@Mock private Action0 sourceSubscribed;
@Mock private Observer<Integer> observer1;
@Mock private Observer<Integer> observer2;
@Captor private ArgumentCaptor<Integer> captor1;
@Captor private ArgumentCaptor<Integer> captor2;
private Random random;
private PublishSubject<Integer> subject;
private CachingObservableReplayer<Integer> replayer;
@Before
public void setUp() {
random = new Random();
subject = PublishSubject.create();
replayer = new CachingObservableReplayer<>();
}
@Test
public void shouldSubscribeToSameSequenceWhenSubscribedAgainBeforeCompletion() {
source().compose(replayer).subscribe(observer1);
source().compose(replayer).subscribe(observer2);
verify(sourceSubscribed).call();
}
@Test
public void shouldSubscribeToDifferentSequenceWhenSubscribedAgainAfterError() {
source().compose(replayer).subscribe(observer1);
givenSourceFails();
source().compose(replayer).subscribe(observer2);
verify(sourceSubscribed, times(2)).call();
}
@Test
public void shouldSubscribeToSameSequenceWhenSubscribedAgainAfterCompletion() {
source().compose(replayer).subscribe(observer1);
givenSourceCompletes();
source().compose(replayer).subscribe(observer2);
verify(sourceSubscribed).call();
}
@Test
public void shouldReceiveSameEmissionsWhenSubscribedAgainBeforeCompletion() {
source().compose(replayer).subscribe(observer1);
source().compose(replayer).subscribe(observer2);
verify(observer1, times(2)).onNext(captor1.capture());
verify(observer2, times(2)).onNext(captor2.capture());
assertThat(captor1.getAllValues()).isEqualTo(captor2.getAllValues());
}
@Test
public void shouldNotReceiveSameEmissionsWhenSubscribedAgainAfterError() {
source().compose(replayer).subscribe(observer1);
givenSourceFails();
source().compose(replayer).subscribe(observer2);
verify(observer1, times(2)).onNext(captor1.capture());
verify(observer2, times(2)).onNext(captor2.capture());
assertThat(captor1.getAllValues()).isNotEqualTo(captor2.getAllValues());
}
@Test
public void shouldReceiveSameEmissionsWhenSubscribedAgainAfterTermination() {
source().compose(replayer).subscribe(observer1);
givenSourceCompletes();
source().compose(replayer).subscribe(observer2);
verify(observer1, times(2)).onNext(captor1.capture());
verify(observer2, times(2)).onNext(captor2.capture());
assertThat(captor1.getAllValues()).isEqualTo(captor2.getAllValues());
}
private Observable<Integer> source() {
return random()
.concatWith(random())
.concatWith(subject)
.doOnSubscribe(sourceSubscribed);
}
private Observable<Integer> random() {
return Observable.defer(new Func0<Observable<Integer>>() {
@Override
public Observable<Integer> call() {
return Observable.just(random.nextInt(Integer.MAX_VALUE));
}
});
}
private void givenSourceFails() {
subject.onError(new RuntimeException());
}
private void givenSourceCompletes() {
subject.onCompleted();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment