Skip to content

Instantly share code, notes, and snippets.

@mr-archano
Created September 10, 2016 11:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mr-archano/4e801d25b71d689fd6fb724bcd17c429 to your computer and use it in GitHub Desktop.
Save mr-archano/4e801d25b71d689fd6fb724bcd17c429 to your computer and use it in GitHub Desktop.
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Func0;
import java.util.concurrent.atomic.AtomicReference;
class SubscribedObservableReplayer<T> implements Observable.Transformer<T, T> {
private final AtomicReference<Observable<T>> pendingObservable;
SubscribedObservableReplayer() {
this.pendingObservable = new AtomicReference<>(null);
}
@Override
public Observable<T> call(final Observable<T> source) {
return Observable.defer(new Func0<Observable<T>>() {
@Override
public Observable<T> call() {
Observable<T> sharable = source
.replay()
.refCount()
.doOnUnsubscribe(clearPending());
if (pendingObservable.compareAndSet(null, sharable)) {
return sharable;
}
return pendingObservable.get();
}
});
}
private Action0 clearPending() {
return new Action0() {
@Override
public void call() {
pendingObservable.set(null);
}
};
}
}
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Func0;
import java.util.Random;
@RunWith(MockitoJUnitRunner.class)
public class SubscribedObservableReplayerTest {
@Mock private Action0 sourceSubscribed;
@Mock private Observer<Integer> observer1;
@Mock private Observer<Integer> observer2;
@Captor private ArgumentCaptor<Integer> captor1;
@Captor private ArgumentCaptor<Integer> captor2;
private Random random;
private SubscribedObservableReplayer<Integer> replayer;
@Before
public void setUp() {
random = new Random();
replayer = new SubscribedObservableReplayer<>();
}
@Test
public void shouldSubscribeToOriginalSourceOnlyOnceWhenSubscribedTwiceBeforeAllUnusbscribed() {
Observable<Integer> source = random()
.concatWith(Observable.<Integer>never())
.doOnSubscribe(sourceSubscribed);
Observable<Integer> observable = source.compose(replayer);
observable.subscribe();
observable.subscribe();
verify(sourceSubscribed).call();
}
@Test
public void shouldReceiveSameEmissionsFromOriginalSourceWhenSubscribedTwiceBeforeAllUnsubscribed() {
Observable<Integer> source = random()
.concatWith(random())
.concatWith(Observable.<Integer>never());
Observable<Integer> observable = source.compose(replayer);
Subscription subscription1 = observable.subscribe(observer1);
observable.subscribe(observer2);
subscription1.unsubscribe();
verify(observer1, times(2)).onNext(captor1.capture());
verify(observer2, times(2)).onNext(captor2.capture());
assertThat(captor1.getAllValues()).isEqualTo(captor2.getAllValues());
}
@Test
public void shouldReceiveDifferentEmissionsFromOriginalSourceWhenSubscribedAgainAfterAllUnsubscribed() {
Observable<Integer> source = random()
.concatWith(random())
.concatWith(Observable.<Integer>never());
Observable<Integer> observable = source.compose(replayer);
observable.subscribe(observer1).unsubscribe();
observable.subscribe(observer2);
verify(observer2, times(2)).onNext(captor2.capture());
assertThat(captor1.getAllValues()).isNotEqualTo(captor2.getAllValues());
}
private Observable<Integer> random() {
return Observable.defer(new Func0<Observable<Integer>>() {
@Override
public Observable<Integer> call() {
return Observable.just(random.nextInt(1000));
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment