Skip to content

Instantly share code, notes, and snippets.

@BacLuc
Created April 6, 2019 19:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save BacLuc/84f4271241882379775a2d6e50c4784c to your computer and use it in GitHub Desktop.
Save BacLuc/84f4271241882379775a2d6e50c4784c to your computer and use it in GitHub Desktop.
rx based state machine
interface PaymentEvent {
}
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
public class PaymentStateMachine implements Observer<PaymentEvent>, Disposable {
private final RXStateMachine<PaymentEvent> stateMachine;
private StateChangeCallback stateChangeCallback;
interface StateChangeCallback {
void onInitialEntered();
void onAcceptingEntered();
void onCommittingEntered();
}
PaymentStateMachine(StateChangeCallback stateChangeCallback) {
this.stateChangeCallback = stateChangeCallback;
stateMachine = new RXStateMachine<>(initial());
}
static class Accept implements PaymentEvent {
private final int amount;
public Accept(int amount) {
this.amount = amount;
}
}
private RXStateMachine.TransitionBuilder<PaymentEvent> initial() {
stateChangeCallback.onInitialEntered();
return defineTransitions().when(Accept.class).then(paymentEvent -> {
Accept casted = cast(paymentEvent);
return accepting(casted.amount);
});
}
static class MoneyReceived implements PaymentEvent {
private final int amount;
MoneyReceived(int amount) {
this.amount = amount;
}
}
private RXStateMachine.TransitionBuilder<PaymentEvent> accepting(int amount) {
stateChangeCallback.onAcceptingEntered();
return defineTransitions().when(MoneyReceived.class).then(paymentEvent -> {
MoneyReceived moneyReceived = cast(paymentEvent);
if (moneyReceived.amount == amount) {
return committing(moneyReceived.amount, 0);
} else {
return accepting(amount);
}
});
}
static class Commit implements PaymentEvent {
private final int amount;
public Commit(int amount) {
this.amount = amount;
}
}
private RXStateMachine.TransitionBuilder<PaymentEvent> committing(int reservedAmount, int committedAmount) {
stateChangeCallback.onCommittingEntered();
return defineTransitions().when(Commit.class).then(paymentEvent -> {
Commit commit = cast(paymentEvent);
int newAmount = committedAmount + commit.amount;
if (newAmount >= reservedAmount) {
return initial();
} else {
return committing(reservedAmount, newAmount);
}
});
}
private RXStateMachine.TransitionBuilder<PaymentEvent> defineTransitions() {
return RXStateMachine.TransitionBuilder.defineTransitions();
}
private static <T extends PaymentEvent> T cast(PaymentEvent event) {
//noinspection unchecked
return (T) event;
}
@Override
public void onSubscribe(Disposable disposable) {
stateMachine.onSubscribe(disposable);
}
@Override
public void onNext(PaymentEvent paymentEvent) {
stateMachine.onNext(paymentEvent);
}
@Override
public void onError(Throwable throwable) {
stateMachine.onError(throwable);
}
@Override
public void onComplete() {
stateMachine.onComplete();
}
@Override
public void dispose() {
stateMachine.dispose();
}
@Override
public boolean isDisposed() {
return stateMachine.isDisposed();
}
}
import io.reactivex.Observer;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.subjects.PublishSubject;
import io.vavr.collection.Map;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
public class RXStateMachine<EVENT> implements Observer<EVENT>, Disposable {
private final PublishSubject<EVENT> events = PublishSubject.create();
private CompositeDisposable transitionSubscriptions = new CompositeDisposable();
public RXStateMachine(TransitionBuilder<EVENT> initial) {
updateState(initial);
}
private void updateState(TransitionBuilder<EVENT> transitionBuilder) {
List<Disposable> transitionSubScriptions = transitionBuilder.transitions.map(classFunctionTuple2 -> events.filter(event -> event
.getClass()
.isInstance(classFunctionTuple2._1))
.map((EVENT t) -> Optional
.ofNullable(classFunctionTuple2._2
.apply(t)))
.filter(Optional::isPresent)
.map(Optional::get)
.subscribe(this::updateState))
.collect(Collectors.toList());
transitionSubscriptions.dispose();
this.transitionSubscriptions = new CompositeDisposable(transitionSubScriptions);
}
@Override
public void onSubscribe(Disposable disposable) {
events.onSubscribe(disposable);
}
@Override
public void onNext(EVENT event) {
events.onNext(event);
}
@Override
public void onError(Throwable throwable) {
events.onError(throwable);
}
@Override
public void onComplete() {
events.onComplete();
}
@Override
public void dispose() {
transitionSubscriptions.dispose();
}
@Override
public boolean isDisposed() {
return transitionSubscriptions.isDisposed();
}
public static class TransitionBuilder<E> {
private final Map<Class<? extends E>, Function<E, TransitionBuilder<E>>> transitions;
private final Class<? extends E> currentCondition;
private TransitionBuilder(Map<Class<? extends E>, Function<E, TransitionBuilder<E>>> map, Class<? extends E> currentCondition) {
transitions = map;
this.currentCondition = currentCondition;
}
public static <E> TransitionBuilder<E> defineTransitions() {
return new TransitionBuilder<>(io.vavr.collection.LinkedHashMap.empty(), null);
}
public TransitionBuilder<E> when(Class<? extends E> condition) {
if (currentCondition != null) {
throw new IllegalStateException("condition already defined, you need to use then first, before you can use another when");
}
return new TransitionBuilder<>(transitions, condition);
}
public TransitionBuilder<E> then(Function<E, TransitionBuilder<E>> stateFactory) {
if (currentCondition == null) {
throw new IllegalStateException("no condition defined. you need to call when first");
}
return new TransitionBuilder<>(transitions.put(currentCondition, stateFactory), null);
}
}
}
import io.reactivex.subjects.BehaviorSubject;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;
import static org.mockito.Mockito.*;
public class RXStateMachineShouldTest {
private BehaviorSubject<PaymentEvent> objectBehaviorSubject;
private PaymentStateMachine paymentStateMachine;
private PaymentStateMachine.StateChangeCallback stateChangeCallback;
private InOrder inOrder;
@Before
public void setupStateMachine() {
stateChangeCallback = mock(PaymentStateMachine.StateChangeCallback.class);
inOrder = inOrder(stateChangeCallback);
paymentStateMachine = spy(new PaymentStateMachine(stateChangeCallback));
objectBehaviorSubject = BehaviorSubject.create();
objectBehaviorSubject.subscribe(paymentStateMachine);
}
@After
public void tearDown() {
inOrder.verifyNoMoreInteractions();
paymentStateMachine.dispose();
}
@Test
public void be_in_initial_state_at_start() {
inOrder.verify(stateChangeCallback).onInitialEntered();
}
@Test
public void switch_to_accepting_state_when_accept_event_received() {
objectBehaviorSubject.onNext(new PaymentStateMachine.Accept(1));
inOrder.verify(stateChangeCallback).onInitialEntered();
inOrder.verify(stateChangeCallback).onAcceptingEntered();
}
@Test
public void cycle_through_state_machine() {
objectBehaviorSubject.onNext(new PaymentStateMachine.Accept(1));
objectBehaviorSubject.onNext(new PaymentStateMachine.MoneyReceived(1));
objectBehaviorSubject.onNext(new PaymentStateMachine.Commit(1));
inOrder.verify(stateChangeCallback).onInitialEntered();
inOrder.verify(stateChangeCallback).onAcceptingEntered();
inOrder.verify(stateChangeCallback).onCommittingEntered();
inOrder.verify(stateChangeCallback).onInitialEntered();
}
@Test
public void handle_multiple_commits() {
objectBehaviorSubject.onNext(new PaymentStateMachine.Accept(2));
objectBehaviorSubject.onNext(new PaymentStateMachine.MoneyReceived(2));
objectBehaviorSubject.onNext(new PaymentStateMachine.Commit(1));
objectBehaviorSubject.onNext(new PaymentStateMachine.Commit(1));
inOrder.verify(stateChangeCallback).onInitialEntered();
inOrder.verify(stateChangeCallback).onAcceptingEntered();
inOrder.verify(stateChangeCallback, times(2)).onCommittingEntered();
inOrder.verify(stateChangeCallback).onInitialEntered();
}
@Test
public void not_change_state_when_no_transition_defined() {
objectBehaviorSubject.onNext(new PaymentStateMachine.MoneyReceived(1));
inOrder.verify(stateChangeCallback).onInitialEntered();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment