Skip to content

Instantly share code, notes, and snippets.

@weefbellington
Created August 2, 2016 02:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save weefbellington/b0c4cfa004eedaba4203add887c1bbba to your computer and use it in GitHub Desktop.
Save weefbellington/b0c4cfa004eedaba4203add887c1bbba to your computer and use it in GitHub Desktop.
replay(1).refcount() test
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.observers.TestSubscriber;
public class ReplayTest {
@Test
public void testReplay() {
final Observable<Long> infinite = Observable.interval(1, TimeUnit.SECONDS).doOnNext(new Action1<Long>() {
@Override
public void call(Long count) {
System.out.println(String.format("count is %d", count));
}
});
final Observable<Long> shared = infinite.replay(1).refCount();
shared.doOnSubscribe(new Action0() {
@Override
public void call() {
System.out.println("subscribing stream 1");
}
}).takeUntil(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long count) {
return count > 5;
}
}).subscribe(new Action1<Long>() {
@Override
public void call(Long count) {
System.out.println(String.format("stream 1 saw: %d", count));
}
});
shared.doOnSubscribe(new Action0() {
@Override public void call() {
System.out.println("subscribing stream 2");
}
}).delaySubscription(3500, TimeUnit.MILLISECONDS).takeUntil(new Func1<Long, Boolean>() {
@Override public Boolean call(Long count) {
return count > 10;
}
}).subscribe(new Action1<Long>() {
@Override public void call(Long count) {
System.out.println(String.format("stream 2 saw: %d", count));
}
});
shared.doOnSubscribe(new Action0() {
@Override public void call() {
System.out.println("subscribing stream 3");
}
}).delaySubscription(4500, TimeUnit.MILLISECONDS).takeUntil(new Func1<Long, Boolean>() {
@Override public Boolean call(Long count) {
return count > 10;
}
}).subscribe(new Action1<Long>() {
@Override public void call(Long count) {
System.out.println(String.format("stream 3 saw: %d", count));
}
});
TestSubscriber testSubscriber = new TestSubscriber();
testSubscriber.awaitTerminalEvent();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment