Skip to content

Instantly share code, notes, and snippets.

@subinkrishna
Created September 9, 2016 19:23
Show Gist options
  • Save subinkrishna/dec800a88775921ffc3f83727574363c to your computer and use it in GitHub Desktop.
Save subinkrishna/dec800a88775921ffc3f83727574363c to your computer and use it in GitHub Desktop.
Observable creation with unsubscription
package com.subinkrishna.rx;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
import rx.subscriptions.Subscriptions;
public class Create {
public static void main(String[] args) throws Exception {
System.out.println("Start" + " (" + Thread.currentThread().getId() + ")");
Observable<String> observe = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
System.out.println("Call" + " (" + Thread.currentThread().getId() + ")");
subscriber.onNext("Hello!");
// Unsubscribes all the subscriptions!
// subscriber.onCompleted();
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
System.out.println("World!" + " (" + Thread.currentThread().getId() + ")");
}
}));
}
});
Subscription sub = observe
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.doOnNext(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("doOnNext: " + s + " (" + Thread.currentThread().getId() + ")");
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted" + " (" + Thread.currentThread().getId() + ")");
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(String s) {
System.out.println("onNext: " + s + " (" + Thread.currentThread().getId() + ")");
}
});
Thread.sleep(2000);
System.out.println("End" + " (" + Thread.currentThread().getId() + ")");
sub.unsubscribe();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment