Skip to content

Instantly share code, notes, and snippets.

@SergejIsbrecht
Created January 25, 2021 11:09
Show Gist options
  • Save SergejIsbrecht/f3ad98dd1a07e15cd898acb73210d3bb to your computer and use it in GitHub Desktop.
Save SergejIsbrecht/f3ad98dd1a07e15cd898acb73210d3bb to your computer and use it in GitHub Desktop.
RxJava
package io.reactivex.rxjava3.core;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class SchedulersTest {
public void init() {
RxJavaPlugins.setInitSingleSchedulerHandler(
schedulerCallable -> Schedulers.from(Executors.newSingleThreadScheduledExecutor()));
RxJavaPlugins.lockdown();
// `ScheduledThreadPoolExecutor` -> uses `System.nanoTime()
// public long getDelay(TimeUnit unit) {
// return unit.convert(time - System.nanoTime(), NANOSECONDS);
// }
// Scheduler default impl uses `System.currentTimeMillis()`
// this could have different behavior with delayed tasks
// use-case: System.currentTimeMillis()
// update some state every minute. When suspended for 10 minutes and awaken, timer fires,
// because of jump
// some action is invoked to update a state. When time does not jump, the state might not be
// updated for
// delta (0 < x < 1) min.
// use-case: System.nanoTime()
// watchdog which checks some
// 1. define different single scheduler
// 2. tell every developer that `ScheduledExecutorService` should be used for periodic tasks
// Problem: Android specific time which is kind of monotonic but is suspension aware
// use-case: not depending on external time changes (System.currentTimeMillis) only forward
// monotonic with jumps due to adjustment
// ideal for periodic timers, which fire after suspension, when system has been asleep for set
// timer-period
// what would I do? Well,
// 1. use ScheduledExecutorService with `System.nanoTime()` in conjunction with
// `Schedulers.from` and have a not adjusted time on Linux_x64_86
// 2. implement own Scheduler (e.g. Single-Threaded-Scheduler) using time-source x
// 3. delegation + some copy-pasting + now-overwrite
// Problem
// different time-sources across: RxJava uses System.currentTimeMillis |
// ScheduledExecutorService uses System.nanoTime
// lets say I want to schedule tasks periodically with `Executors.newSingleThreadExecutor()` but
// instead of `System.currentTimeMillis()` use `SystemClock.elapsedRealtimeNanos()`
// Even bigger Problem:
// even `Single` uses two different times: either `System.nanoTime` by
// `ScheduledExecutorService` and `System.currentTimeMillis`, when using a `worker`. Which is
// probably very bad for 'suspension', because `Scheduler.schedulePeriodicallyDirect` will
// (probably) not fire after suspension and `Scheduler.Worker.schedulePeriodically` will,
// because based upon `System.currentTimeMillis()`. This could lead to very unfortunate bugs.
// Problem 2
// `Executors.newSingleThreadExecutor()` will use the default impl, when used in conjunction
// with `Schedulers.from`, which in turn uses `System.currentTimeMillis()`
RxJavaPlugins.setSingleSchedulerHandler(TimeSchedulerWrapper::new);
}
static final class TimeSchedulerWrapper extends Scheduler {
private final Scheduler scheduler;
TimeSchedulerWrapper(Scheduler scheduler) {
this.scheduler = Objects.requireNonNull(scheduler, "scheduler");
}
@Override
public long now(@NonNull TimeUnit unit) {
return System.nanoTime();
}
@Override
public @NonNull Worker createWorker() {
return new WorkerWrapper(scheduler.createWorker());
}
@Override
public void start() {
scheduler.start();
}
@Override
public void shutdown() {
scheduler.shutdown();
}
}
static final class WorkerWrapper extends Scheduler.Worker {
private final Scheduler.Worker worker;
WorkerWrapper(Scheduler.Worker worker) {
this.worker = Objects.requireNonNull(worker, "worker");
}
@Override
public long now(@NonNull TimeUnit unit) {
return System.nanoTime();
}
@Override
public @NonNull Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
return worker.schedule(run, delay, unit);
}
@Override
public void dispose() {
worker.dispose();
}
@Override
public boolean isDisposed() {
return worker.isDisposed();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment