Skip to content

Instantly share code, notes, and snippets.

@MikolajKakol
Last active August 29, 2015 13:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MikolajKakol/8985583 to your computer and use it in GitHub Desktop.
Save MikolajKakol/8985583 to your computer and use it in GitHub Desktop.
BehaviorSubject isue test
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import junit.framework.TestCase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.schedulers.Schedulers;
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action1;
import rx.util.functions.Func1;
@RunWith(Parameterized.class)
public class RxTestGithubIssue extends TestCase {
@Parameterized.Parameters
public static List<Object[]> data() {
return Arrays.asList(new Object[100][0]);
}
Presenter p = new Presenter();
public static Set<String> logs = new HashSet<>();
@Test public void testBeh() throws Exception {
logs.clear();
p.getLongCookies().subscribe(new Dump(0));
p.getLongCookies().subscribe(new Dump(1));
Thread.sleep(100);
p.longiesChanger();
p.getLongCookies().subscribe(new Dump(2));
p.getLongCookies().subscribe(new Dump(3));
Thread.sleep(200);
assertTrue(logs.contains("Obs 0 next 1"));
assertTrue(logs.contains("Obs 1 next 1"));
assertTrue(logs.contains("Obs 2 next 1"));
assertTrue(logs.contains("Obs 3 next 1"));
}
private static class Presenter {
Service service = new Service();
CallMerger<Long> merger = new CallMerger<Long>();
public Presenter() {
merger.setFunc(makeCookiesObs());
}
public Observable<Long> getLongCookies() {
return merger.call();
}
private Observable<Long> makeCookiesObs() {
return service.getCookiesFromNetwork().map(new Func1<Integer, Long>() {
@Override
public Long call(Integer t1) {
return t1.longValue();
}
});
}
public void longiesChanger() {
merger.reset();
}
private static class CallMerger<T> {
Subject<T, T> subj = ReplaySubject.<T> create();
private Observable<T> func;
private Subscription subs;
public void setFunc(Observable<T> func) {
this.func = func;
}
public void reset() {
// subs.unsubscribe();
subs = func.subscribe(new Action1<T>() {
@Override
public void call(T t1) {
System.out.println("Next " + t1);
subj.onNext(t1);
}
});
}
public Observable<T> call() {
if (subs == null) {
subs = func.subscribe(new Action1<T>() {
@Override
public void call(T t1) {
System.out.println("Next " + t1);
subj.onNext(t1);
}
});
}
return subj.filter(new Func1<T, Boolean>() {
@Override
public Boolean call(T t1) {
return t1 != null;
}
});
}
}
}
private static class Service {
private int cookieCount;
public Observable<Integer> getCookiesFromNetwork() {
return Observable.<Integer> create(new OnSubscribeFunc<Integer>() {
@Override
public Subscription onSubscribe(Observer<? super Integer> t1) {
// try {
// Thread.sleep(150);
// } catch (InterruptedException e) {
// }
System.out.println("Service onSubscribe");
t1.onNext(cookieCount++);
t1.onCompleted();
return Subscriptions.empty();
}
}).subscribeOn(Schedulers.io());
}
}
class Dump implements Observer<Long> {
private String id;
public Dump(int id) {
this.id = String.valueOf(id);
}
@Override
public void onCompleted() {
System.out.println("Obs " + id + " completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Obs " + id + " error");
e.printStackTrace();
}
@Override
public void onNext(Long args) {
logs.add("Obs " + id + " next " + args);
System.out.println("Obs " + id + " next " + args);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment