Created
January 15, 2018 20:22
-
-
Save edudobay/05192f043ec3f1576ccaf933cdb9c429 to your computer and use it in GitHub Desktop.
RxJava state machine
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example | |
import io.reactivex.Observable | |
import io.reactivex.disposables.Disposable | |
import io.reactivex.functions.BiFunction | |
import io.reactivex.subjects.BehaviorSubject | |
import java.util.concurrent.TimeUnit | |
import kotlin.concurrent.thread | |
enum class InternalState { ON, OFF, GOING_ON, GOING_OFF } | |
enum class State { ON, OFF } | |
class StateMachine { | |
private val state = BehaviorSubject.createDefault(InternalState.OFF).toSerialized() | |
private val desiredState = BehaviorSubject.create<State>().toSerialized() | |
fun moveToState(targetState: State) { | |
desiredState.onNext(targetState) | |
} | |
fun turnOn() = moveToState(State.ON) | |
fun turnOff() = moveToState(State.OFF) | |
fun transition(fromState: InternalState, toState: State): Observable<InternalState> = | |
if (fromState == InternalState.ON && toState == State.OFF) | |
goingOff() | |
else if (fromState == InternalState.OFF && toState == State.ON) | |
goingOn() | |
else | |
Observable.just(fromState) | |
fun goingOff(): Observable<InternalState> = | |
Observable.just(InternalState.OFF) | |
.delay(100, TimeUnit.MILLISECONDS) | |
.startWith(InternalState.GOING_OFF) | |
fun goingOn(): Observable<InternalState> = | |
Observable.just(InternalState.ON) | |
.delay(500, TimeUnit.MILLISECONDS) | |
.startWith(InternalState.GOING_ON) | |
fun state() = state.hide() | |
fun subscribe(): Disposable { | |
return desiredState | |
.withLatestFrom(state, BiFunction(this::combineToAndFromState)) | |
.flatMap { (fromState, toState) -> | |
transition(fromState, toState) | |
} | |
.subscribe(state::onNext) | |
} | |
private fun combineToAndFromState(toState: State, fromState: InternalState) = | |
Pair(fromState, toState) | |
} | |
class Clock { | |
private val startedAt = System.currentTimeMillis() | |
fun millis(): Long = System.currentTimeMillis() - startedAt | |
fun seconds(): Double = millis() * 1e-3 | |
} | |
class StateMachineTest { | |
fun testTurnItOn() { | |
val clock = Clock() | |
val machine = StateMachine() | |
machine.state().forEach { state -> | |
System.out.format("[%10.3f] Machine is %s\n", clock.seconds(), state.name) | |
} | |
val d = machine.subscribe() | |
thread { | |
System.out.println("Starting test commands") | |
machine.turnOn() | |
Thread.sleep(2000) | |
machine.turnOff() | |
Thread.sleep(1000) | |
}.join() | |
d.dispose() | |
System.out.println("Finishing test") | |
} | |
companion object { | |
@JvmStatic | |
fun main(args: Array<String>) { | |
val suite = StateMachineTest() | |
suite.testTurnItOn() | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment