Skip to content

Instantly share code, notes, and snippets.

@koji-k
Created July 24, 2018 13:56
Show Gist options
  • Save koji-k/153a0483766d57aee165dc6620de4b58 to your computer and use it in GitHub Desktop.
Save koji-k/153a0483766d57aee165dc6620de4b58 to your computer and use it in GitHub Desktop.
RxJavaでIntervalを設けつつのjustの使い方
import io.reactivex.Flowable;
import java.util.concurrent.TimeUnit;
public class Sample1 {
public static void main (String... args ) throws Exception {
Flowable<Integer> flowable1 = Flowable.just(1, 2, 3).map(s -> s * 2);
Flowable<Long> flowable2 = Flowable.interval(1000, TimeUnit.MILLISECONDS);
Flowable ziped = Flowable.zip(flowable1, flowable2, (f1, f2) ->f1);
ziped.subscribe(System.out::println);
Thread.sleep(5000L);
}
}
@koji-k
Copy link
Author

koji-k commented Jul 24, 2018

もしかしてflowable2の方のintervalは終わらない?不安になったのでsubscribeでちゃんとFlowableSubscriberを実装して確認。
結論として問題なし。ちゃんとENDって表示されてる

import io.reactivex.Flowable;
import io.reactivex.FlowableSubscriber;
import org.reactivestreams.Subscription;

import java.util.concurrent.TimeUnit;

public class Sample1 {

    public static void main (String ... args ) throws  Exception {
        Flowable<Integer> flowable1 = Flowable.just(1, 2, 3).map(s -> s * 2);
        Flowable<Long> flowable2 = Flowable.interval(1000, TimeUnit.MILLISECONDS);
        Flowable<Integer> ziped = Flowable.zip(flowable1, flowable2, (f1, f2) ->f1);
        ziped.subscribe(new FlowableSubscriber<Integer>() {
            private Subscription subscription;
            @Override
            public void onSubscribe(Subscription s) {
                subscription = s;
                s.request(1L);
            }

            @Override
            public void onNext(Integer o) {
                System.out.println(o);
                subscription.request(1L);
            }

            @Override
            public void onError(Throwable t) {

            }

            @Override
            public void onComplete() {
                System.out.println("END");
            }
        });
        Thread.sleep(5000L);
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment