Skip to content

Instantly share code, notes, and snippets.

@mkrajc
Created May 19, 2017 11:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mkrajc/4336016129711265217b74346dad7d9e to your computer and use it in GitHub Desktop.
Save mkrajc/4336016129711265217b74346dad7d9e to your computer and use it in GitHub Desktop.
package org.mech;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
public class RxTest {
public static void main(String[] args) throws InterruptedException {
final Callable<Integer> init = () -> 1000;
final TimeShifter timeShifter = new TimeShifter();
final Observable<Integer> dynamic = Observable.generate(init, timeShifter);
dynamic.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull final Disposable d) {
System.out.println(Thread.currentThread().getName() + " subscribed");
}
@Override
public void onNext(@NonNull final Integer s) {
System.out.println(Thread.currentThread().getName() + " onNext " + s);
}
@Override
public void onError(@NonNull final Throwable e) {
System.out.println(Thread.currentThread().getName() + " onError " + e);
}
@Override
public void onComplete() {
System.out.println(Thread.currentThread().getName() + " onComplete");
}
});
Thread.sleep(5000);
timeShifter.speedUp();
Thread.sleep(5000);
timeShifter.speedUp();
Thread.sleep(5000);
timeShifter.stop();
}
}
package org.mech;
import io.reactivex.Emitter;
import io.reactivex.annotations.NonNull;
import io.reactivex.functions.BiFunction;
public class TimeShifter implements BiFunction<Integer, Emitter<Integer>, Integer> {
private Integer step = 1000;
private boolean stop = false;
@Override
public Integer apply(@NonNull final Integer curStep, @NonNull final Emitter<Integer> stepEmitter) throws Exception {
if(stop){
stepEmitter.onComplete();
} else {
Thread.sleep(step);
stepEmitter.onNext(curStep);
}
return step;
}
public void stop(){
this.stop = true;
}
public void speedUp(){
step /= 2;
System.out.println("Current step " + step + " ms");
}
public void slowDown(){
step *= 2;
System.out.println("Current step " + step + " ms");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment