Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
public class SubscribeOnTest {
static <T> Observable<T> subscribeOn1(Observable<T> source, Scheduler scheduler) {
return Observable.create((Subscriber<? super T> s) -> {
Subscriber<T> s0 = new Subscriber<T>() {
@Override
public void onCompleted() {
s.onCompleted();
}
@Override
public void onError(Throwable e) {
s.onError(e);
}
@Override
public void onNext(T t) {
s.onNext(t);
}
};
MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
s.add(mas);
mas.write(scheduler.schedule(i -> {
mas.terminate(false);
s.add(Subscriptions.create(() -> {
i.schedule(j -> {
System.out.println("Unsub: " + Thread.currentThread().getName());
s0.unsubscribe();
});
}));
System.out.println("Sub : " + Thread.currentThread().getName());
source.subscribe(s0);
}), false);
});
}
public static void main(String[] args) throws Exception {
Subscription s = subscribeOn1(Observable.interval(10, TimeUnit.MILLISECONDS)
, Schedulers.newThread()).subscribe(v -> {
System.out.printf("%s: %s%n", Thread.currentThread().getName(), v);
});
Thread.sleep(105);
s.unsubscribe();
Thread.sleep(100);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment