Skip to content

Instantly share code, notes, and snippets.

@mkrajc
Created May 22, 2017 09:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mkrajc/de8bd66e3528af774e4e6b7deb0c691e to your computer and use it in GitHub Desktop.
Save mkrajc/de8bd66e3528af774e4e6b7deb0c691e to your computer and use it in GitHub Desktop.
package org.mech;
import java.util.concurrent.TimeUnit;
import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.disposables.Disposable;
import io.reactivex.observables.ConnectableObservable;
import io.reactivex.schedulers.Schedulers;
public class RxDynamicTimer {
private final Speed speed;
private Disposable subsription;
private final ConnectableObservable<Integer> dynamicTimer;
private Scheduler scheduler = Schedulers.single();
public RxDynamicTimer(Integer milis) {
this.speed = new Speed(milis);
this.dynamicTimer = setupAndPublish();
}
public Observable<Integer> getTimer() {
return dynamicTimer;
}
public Speed getSpeed() {
return speed;
}
public void start() {
if (subsription == null) {
// start the dynamic speed
this.subsription = dynamicTimer.connect();
}
}
public void stop() {
if (subsription != null || !subsription.isDisposed()) {
subsription.dispose();
//scheduler.shutdown();
}
}
private ConnectableObservable<Integer> setupAndPublish() {
// create dynamic speed
return createTimer()
// some logging
.doOnNext(i -> RxDynamicTimerTest.log("Got: " + i))
// publish it, so only one subscription is done to createTimer internally
// but it will not start until connect is invoked
.publish();
}
private Observable<Integer> createTimer() {
// recursively delay emission of item based on speed
return Observable.just(1).delay(speed.getStep(), TimeUnit.MILLISECONDS, scheduler)
// lazy concat with itself
.concatWith(Observable.defer(this::createTimer));
}
// Speed provides current speed of item emission
public static class Speed {
private Integer step = 1000;
Speed(Integer milis) {
this.step = milis;
}
public Integer getStep() {
return step;
}
public void speedUp() {
step /= 2;
System.out.println("Current step " + step + " ms");
}
public void slowDown() {
step *= 2;
System.out.println("Current step " + step + " ms");
}
}
}
package org.mech;
import io.reactivex.annotations.NonNull;
import io.reactivex.observers.DisposableObserver;
public class RxDynamicTimerTest {
private static Long start;
public static void main(String[] args) throws InterruptedException {
start = System.currentTimeMillis();
log("Starting");
final RxDynamicTimer rxDynamicTimer = new RxDynamicTimer(1000);
final DynamicTimerSubscriber subA = new DynamicTimerSubscriber("SubA");
final DynamicTimerSubscriber subB = new DynamicTimerSubscriber("SubB");
log("Observers created");
rxDynamicTimer.getTimer().subscribe(subA);
rxDynamicTimer.getTimer().subscribe(subB);
log("Observers subscribed.");
waitFor(1);
log("Start dynamic timer");
rxDynamicTimer.start();
waitFor(3);
log("Increase speed");
rxDynamicTimer.getSpeed().speedUp();
waitFor(3);
log("Unsubscribe A");
subA.dispose();
waitFor(3);
log("Unsubscribe B");
subB.dispose();
waitFor(3);
log("Stop timer.");
rxDynamicTimer.stop();
log("End");
}
private static void waitFor(Integer seconds) throws InterruptedException {
log("Waiting " + seconds + " second" + ((seconds > 1) ? "s" : "") + " ...");
Thread.sleep(seconds * 1000);
}
public static void log(String s) {
log("APP ", s);
}
private static void log(String src, String s) {
System.out
.println("[" + (System.currentTimeMillis() - start) + "]\t" + src + "\t[" + Thread.currentThread().getName() + "]\t" + s);
}
private static class DynamicTimerSubscriber extends DisposableObserver<Integer> {
private String name;
DynamicTimerSubscriber(String n) {
this.name = n;
}
@Override
protected void onStart() {
log("onStart");
}
@Override
public void onNext(@NonNull final Integer i) {
log("onNext: " + String.valueOf(i));
}
@Override
public void onError(@NonNull final Throwable e) {
e.printStackTrace();
log("onError: " + e.getClass());
}
@Override
public void onComplete() {
log("onComplete");
}
private void log(String s) {
RxDynamicTimerTest.log(name, s);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment