Created
April 6, 2019 19:10
-
-
Save BacLuc/84f4271241882379775a2d6e50c4784c to your computer and use it in GitHub Desktop.
rx based state machine
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
interface PaymentEvent { | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.reactivex.Observer; | |
import io.reactivex.disposables.Disposable; | |
public class PaymentStateMachine implements Observer<PaymentEvent>, Disposable { | |
private final RXStateMachine<PaymentEvent> stateMachine; | |
private StateChangeCallback stateChangeCallback; | |
interface StateChangeCallback { | |
void onInitialEntered(); | |
void onAcceptingEntered(); | |
void onCommittingEntered(); | |
} | |
PaymentStateMachine(StateChangeCallback stateChangeCallback) { | |
this.stateChangeCallback = stateChangeCallback; | |
stateMachine = new RXStateMachine<>(initial()); | |
} | |
static class Accept implements PaymentEvent { | |
private final int amount; | |
public Accept(int amount) { | |
this.amount = amount; | |
} | |
} | |
private RXStateMachine.TransitionBuilder<PaymentEvent> initial() { | |
stateChangeCallback.onInitialEntered(); | |
return defineTransitions().when(Accept.class).then(paymentEvent -> { | |
Accept casted = cast(paymentEvent); | |
return accepting(casted.amount); | |
}); | |
} | |
static class MoneyReceived implements PaymentEvent { | |
private final int amount; | |
MoneyReceived(int amount) { | |
this.amount = amount; | |
} | |
} | |
private RXStateMachine.TransitionBuilder<PaymentEvent> accepting(int amount) { | |
stateChangeCallback.onAcceptingEntered(); | |
return defineTransitions().when(MoneyReceived.class).then(paymentEvent -> { | |
MoneyReceived moneyReceived = cast(paymentEvent); | |
if (moneyReceived.amount == amount) { | |
return committing(moneyReceived.amount, 0); | |
} else { | |
return accepting(amount); | |
} | |
}); | |
} | |
static class Commit implements PaymentEvent { | |
private final int amount; | |
public Commit(int amount) { | |
this.amount = amount; | |
} | |
} | |
private RXStateMachine.TransitionBuilder<PaymentEvent> committing(int reservedAmount, int committedAmount) { | |
stateChangeCallback.onCommittingEntered(); | |
return defineTransitions().when(Commit.class).then(paymentEvent -> { | |
Commit commit = cast(paymentEvent); | |
int newAmount = committedAmount + commit.amount; | |
if (newAmount >= reservedAmount) { | |
return initial(); | |
} else { | |
return committing(reservedAmount, newAmount); | |
} | |
}); | |
} | |
private RXStateMachine.TransitionBuilder<PaymentEvent> defineTransitions() { | |
return RXStateMachine.TransitionBuilder.defineTransitions(); | |
} | |
private static <T extends PaymentEvent> T cast(PaymentEvent event) { | |
//noinspection unchecked | |
return (T) event; | |
} | |
@Override | |
public void onSubscribe(Disposable disposable) { | |
stateMachine.onSubscribe(disposable); | |
} | |
@Override | |
public void onNext(PaymentEvent paymentEvent) { | |
stateMachine.onNext(paymentEvent); | |
} | |
@Override | |
public void onError(Throwable throwable) { | |
stateMachine.onError(throwable); | |
} | |
@Override | |
public void onComplete() { | |
stateMachine.onComplete(); | |
} | |
@Override | |
public void dispose() { | |
stateMachine.dispose(); | |
} | |
@Override | |
public boolean isDisposed() { | |
return stateMachine.isDisposed(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.reactivex.Observer; | |
import io.reactivex.disposables.CompositeDisposable; | |
import io.reactivex.disposables.Disposable; | |
import io.reactivex.subjects.PublishSubject; | |
import io.vavr.collection.Map; | |
import java.util.List; | |
import java.util.Optional; | |
import java.util.function.Function; | |
import java.util.stream.Collectors; | |
public class RXStateMachine<EVENT> implements Observer<EVENT>, Disposable { | |
private final PublishSubject<EVENT> events = PublishSubject.create(); | |
private CompositeDisposable transitionSubscriptions = new CompositeDisposable(); | |
public RXStateMachine(TransitionBuilder<EVENT> initial) { | |
updateState(initial); | |
} | |
private void updateState(TransitionBuilder<EVENT> transitionBuilder) { | |
List<Disposable> transitionSubScriptions = transitionBuilder.transitions.map(classFunctionTuple2 -> events.filter(event -> event | |
.getClass() | |
.isInstance(classFunctionTuple2._1)) | |
.map((EVENT t) -> Optional | |
.ofNullable(classFunctionTuple2._2 | |
.apply(t))) | |
.filter(Optional::isPresent) | |
.map(Optional::get) | |
.subscribe(this::updateState)) | |
.collect(Collectors.toList()); | |
transitionSubscriptions.dispose(); | |
this.transitionSubscriptions = new CompositeDisposable(transitionSubScriptions); | |
} | |
@Override | |
public void onSubscribe(Disposable disposable) { | |
events.onSubscribe(disposable); | |
} | |
@Override | |
public void onNext(EVENT event) { | |
events.onNext(event); | |
} | |
@Override | |
public void onError(Throwable throwable) { | |
events.onError(throwable); | |
} | |
@Override | |
public void onComplete() { | |
events.onComplete(); | |
} | |
@Override | |
public void dispose() { | |
transitionSubscriptions.dispose(); | |
} | |
@Override | |
public boolean isDisposed() { | |
return transitionSubscriptions.isDisposed(); | |
} | |
public static class TransitionBuilder<E> { | |
private final Map<Class<? extends E>, Function<E, TransitionBuilder<E>>> transitions; | |
private final Class<? extends E> currentCondition; | |
private TransitionBuilder(Map<Class<? extends E>, Function<E, TransitionBuilder<E>>> map, Class<? extends E> currentCondition) { | |
transitions = map; | |
this.currentCondition = currentCondition; | |
} | |
public static <E> TransitionBuilder<E> defineTransitions() { | |
return new TransitionBuilder<>(io.vavr.collection.LinkedHashMap.empty(), null); | |
} | |
public TransitionBuilder<E> when(Class<? extends E> condition) { | |
if (currentCondition != null) { | |
throw new IllegalStateException("condition already defined, you need to use then first, before you can use another when"); | |
} | |
return new TransitionBuilder<>(transitions, condition); | |
} | |
public TransitionBuilder<E> then(Function<E, TransitionBuilder<E>> stateFactory) { | |
if (currentCondition == null) { | |
throw new IllegalStateException("no condition defined. you need to call when first"); | |
} | |
return new TransitionBuilder<>(transitions.put(currentCondition, stateFactory), null); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.reactivex.subjects.BehaviorSubject; | |
import org.junit.After; | |
import org.junit.Before; | |
import org.junit.Test; | |
import org.mockito.InOrder; | |
import static org.mockito.Mockito.*; | |
public class RXStateMachineShouldTest { | |
private BehaviorSubject<PaymentEvent> objectBehaviorSubject; | |
private PaymentStateMachine paymentStateMachine; | |
private PaymentStateMachine.StateChangeCallback stateChangeCallback; | |
private InOrder inOrder; | |
@Before | |
public void setupStateMachine() { | |
stateChangeCallback = mock(PaymentStateMachine.StateChangeCallback.class); | |
inOrder = inOrder(stateChangeCallback); | |
paymentStateMachine = spy(new PaymentStateMachine(stateChangeCallback)); | |
objectBehaviorSubject = BehaviorSubject.create(); | |
objectBehaviorSubject.subscribe(paymentStateMachine); | |
} | |
@After | |
public void tearDown() { | |
inOrder.verifyNoMoreInteractions(); | |
paymentStateMachine.dispose(); | |
} | |
@Test | |
public void be_in_initial_state_at_start() { | |
inOrder.verify(stateChangeCallback).onInitialEntered(); | |
} | |
@Test | |
public void switch_to_accepting_state_when_accept_event_received() { | |
objectBehaviorSubject.onNext(new PaymentStateMachine.Accept(1)); | |
inOrder.verify(stateChangeCallback).onInitialEntered(); | |
inOrder.verify(stateChangeCallback).onAcceptingEntered(); | |
} | |
@Test | |
public void cycle_through_state_machine() { | |
objectBehaviorSubject.onNext(new PaymentStateMachine.Accept(1)); | |
objectBehaviorSubject.onNext(new PaymentStateMachine.MoneyReceived(1)); | |
objectBehaviorSubject.onNext(new PaymentStateMachine.Commit(1)); | |
inOrder.verify(stateChangeCallback).onInitialEntered(); | |
inOrder.verify(stateChangeCallback).onAcceptingEntered(); | |
inOrder.verify(stateChangeCallback).onCommittingEntered(); | |
inOrder.verify(stateChangeCallback).onInitialEntered(); | |
} | |
@Test | |
public void handle_multiple_commits() { | |
objectBehaviorSubject.onNext(new PaymentStateMachine.Accept(2)); | |
objectBehaviorSubject.onNext(new PaymentStateMachine.MoneyReceived(2)); | |
objectBehaviorSubject.onNext(new PaymentStateMachine.Commit(1)); | |
objectBehaviorSubject.onNext(new PaymentStateMachine.Commit(1)); | |
inOrder.verify(stateChangeCallback).onInitialEntered(); | |
inOrder.verify(stateChangeCallback).onAcceptingEntered(); | |
inOrder.verify(stateChangeCallback, times(2)).onCommittingEntered(); | |
inOrder.verify(stateChangeCallback).onInitialEntered(); | |
} | |
@Test | |
public void not_change_state_when_no_transition_defined() { | |
objectBehaviorSubject.onNext(new PaymentStateMachine.MoneyReceived(1)); | |
inOrder.verify(stateChangeCallback).onInitialEntered(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment