Skip to content

Instantly share code, notes, and snippets.

@edudobay
Created January 15, 2018 20:22
Show Gist options
  • Save edudobay/05192f043ec3f1576ccaf933cdb9c429 to your computer and use it in GitHub Desktop.
Save edudobay/05192f043ec3f1576ccaf933cdb9c429 to your computer and use it in GitHub Desktop.
RxJava state machine
package example
import io.reactivex.Observable
import io.reactivex.disposables.Disposable
import io.reactivex.functions.BiFunction
import io.reactivex.subjects.BehaviorSubject
import java.util.concurrent.TimeUnit
import kotlin.concurrent.thread
enum class InternalState { ON, OFF, GOING_ON, GOING_OFF }
enum class State { ON, OFF }
class StateMachine {
private val state = BehaviorSubject.createDefault(InternalState.OFF).toSerialized()
private val desiredState = BehaviorSubject.create<State>().toSerialized()
fun moveToState(targetState: State) {
desiredState.onNext(targetState)
}
fun turnOn() = moveToState(State.ON)
fun turnOff() = moveToState(State.OFF)
fun transition(fromState: InternalState, toState: State): Observable<InternalState> =
if (fromState == InternalState.ON && toState == State.OFF)
goingOff()
else if (fromState == InternalState.OFF && toState == State.ON)
goingOn()
else
Observable.just(fromState)
fun goingOff(): Observable<InternalState> =
Observable.just(InternalState.OFF)
.delay(100, TimeUnit.MILLISECONDS)
.startWith(InternalState.GOING_OFF)
fun goingOn(): Observable<InternalState> =
Observable.just(InternalState.ON)
.delay(500, TimeUnit.MILLISECONDS)
.startWith(InternalState.GOING_ON)
fun state() = state.hide()
fun subscribe(): Disposable {
return desiredState
.withLatestFrom(state, BiFunction(this::combineToAndFromState))
.flatMap { (fromState, toState) ->
transition(fromState, toState)
}
.subscribe(state::onNext)
}
private fun combineToAndFromState(toState: State, fromState: InternalState) =
Pair(fromState, toState)
}
class Clock {
private val startedAt = System.currentTimeMillis()
fun millis(): Long = System.currentTimeMillis() - startedAt
fun seconds(): Double = millis() * 1e-3
}
class StateMachineTest {
fun testTurnItOn() {
val clock = Clock()
val machine = StateMachine()
machine.state().forEach { state ->
System.out.format("[%10.3f] Machine is %s\n", clock.seconds(), state.name)
}
val d = machine.subscribe()
thread {
System.out.println("Starting test commands")
machine.turnOn()
Thread.sleep(2000)
machine.turnOff()
Thread.sleep(1000)
}.join()
d.dispose()
System.out.println("Finishing test")
}
companion object {
@JvmStatic
fun main(args: Array<String>) {
val suite = StateMachineTest()
suite.testTurnItOn()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment