Created
May 19, 2017 11:08
-
-
Save mkrajc/4336016129711265217b74346dad7d9e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.mech; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.Executors; | |
import io.reactivex.Observable; | |
import io.reactivex.Observer; | |
import io.reactivex.annotations.NonNull; | |
import io.reactivex.disposables.Disposable; | |
import io.reactivex.schedulers.Schedulers; | |
public class RxTest { | |
public static void main(String[] args) throws InterruptedException { | |
final Callable<Integer> init = () -> 1000; | |
final TimeShifter timeShifter = new TimeShifter(); | |
final Observable<Integer> dynamic = Observable.generate(init, timeShifter); | |
dynamic.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor())) | |
.subscribe(new Observer<Integer>() { | |
@Override | |
public void onSubscribe(@NonNull final Disposable d) { | |
System.out.println(Thread.currentThread().getName() + " subscribed"); | |
} | |
@Override | |
public void onNext(@NonNull final Integer s) { | |
System.out.println(Thread.currentThread().getName() + " onNext " + s); | |
} | |
@Override | |
public void onError(@NonNull final Throwable e) { | |
System.out.println(Thread.currentThread().getName() + " onError " + e); | |
} | |
@Override | |
public void onComplete() { | |
System.out.println(Thread.currentThread().getName() + " onComplete"); | |
} | |
}); | |
Thread.sleep(5000); | |
timeShifter.speedUp(); | |
Thread.sleep(5000); | |
timeShifter.speedUp(); | |
Thread.sleep(5000); | |
timeShifter.stop(); | |
} | |
} | |
package org.mech; | |
import io.reactivex.Emitter; | |
import io.reactivex.annotations.NonNull; | |
import io.reactivex.functions.BiFunction; | |
public class TimeShifter implements BiFunction<Integer, Emitter<Integer>, Integer> { | |
private Integer step = 1000; | |
private boolean stop = false; | |
@Override | |
public Integer apply(@NonNull final Integer curStep, @NonNull final Emitter<Integer> stepEmitter) throws Exception { | |
if(stop){ | |
stepEmitter.onComplete(); | |
} else { | |
Thread.sleep(step); | |
stepEmitter.onNext(curStep); | |
} | |
return step; | |
} | |
public void stop(){ | |
this.stop = true; | |
} | |
public void speedUp(){ | |
step /= 2; | |
System.out.println("Current step " + step + " ms"); | |
} | |
public void slowDown(){ | |
step *= 2; | |
System.out.println("Current step " + step + " ms"); | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment