Skip to content

Instantly share code, notes, and snippets.

@0ximDigital
Created March 13, 2017 15:21

Revisions

  1. 0ximDigital created this gist Mar 13, 2017.
    49 changes: 49 additions & 0 deletions observeOn vs subscribeOn
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,49 @@
    public static void main(String[] args) throws InterruptedException {

    Observable.zip(stringObservable().subscribeOn(Schedulers.io()),
    integerObservable().subscribeOn(Schedulers.io()),
    (s, integer) -> {
    timedLog("Got items, zipping");
    return s + " " + String.valueOf(integer);
    })
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation())
    .subscribe(zipped -> timedLog("Got zipped value + " + zipped));

    Thread.sleep(5000);
    }

    public static Observable<String> stringObservable() {
    return Observable.defer(() -> Observable.fromCallable(Main::getStringItem));
    }

    public static Observable<Integer> integerObservable() {
    return Observable.defer(() -> Observable.fromCallable(Main::getIntegerItem));
    }

    public static String getStringItem() {
    timedLog("Start of getString item");
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    timedLog("Returning string item");
    return "String item";
    }

    public static Integer getIntegerItem() {
    timedLog("Start of getInteger item");
    try {
    Thread.sleep(1500);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    timedLog("Returning integer item");
    return 100;
    }

    private static void timedLog(final String message) {
    final long now = System.currentTimeMillis();
    System.out.println(now + " - " + message);
    }