Skip to content

Instantly share code, notes, and snippets.

@RxDave
Last active May 2, 2016 04:04
Show Gist options
  • Save RxDave/9adc25455917ff12b9b8 to your computer and use it in GitHub Desktop.
Save RxDave/9adc25455917ff12b9b8 to your computer and use it in GitHub Desktop.
Async Iterator State Machine
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Rxx.Labs
{
// An async state machine example based on a simple FSM example found in the following Wikipedia article:
// http://en.wikipedia.org/w/index.php?title=Finite-state_machine&oldid=635948248&section=1#Example:_a_turnstile
static class AsyncStateMachineLab
{
enum TurnstileTransition
{
DepositCoin,
PushArm
}
enum TurnstileOutput
{
AcceptCoin,
RejectCoin,
AcceptPush,
RejectPush
}
static void Main()
{
Console.WriteLine("Press any key to run the next experiment.");
Console.WriteLine();
RunTurnstile("Silence", Enumerable.Empty<TurnstileTransition>());
RunTurnstile("Too Generous", new[] { TurnstileTransition.DepositCoin });
RunTurnstile("Too Cheap", new[] { TurnstileTransition.PushArm });
RunTurnstile("Correct Usage", new[] { TurnstileTransition.DepositCoin, TurnstileTransition.PushArm });
RunTurnstile("Overpaid", new[] { TurnstileTransition.DepositCoin, TurnstileTransition.DepositCoin, TurnstileTransition.PushArm });
RunTurnstile("Never seen a turnstile before", new[] { TurnstileTransition.PushArm, TurnstileTransition.DepositCoin, TurnstileTransition.DepositCoin });
RunTurnstile("Desperate", new[] { TurnstileTransition.PushArm, TurnstileTransition.PushArm, TurnstileTransition.DepositCoin });
}
static void RunTurnstile(string name, IEnumerable<TurnstileTransition> transitions)
{
Console.WriteLine("Running \"{0}\" experiment...", name);
// Each observable test must never complete to avoid an exception thrown by Rx when awaiting an empty observable.
RunTurnstile(transitions.ToObservable().Repeat(10).Concat(Observable.Never<TurnstileTransition>()));
}
static void RunTurnstile(IObservable<TurnstileTransition> transitions)
{
var ys = transitions.Publish(pxs => Observable.Create<TurnstileOutput>((observer, cancel) => Turnstile(pxs, observer, cancel)));
using (ys.Subscribe(x =>
{
if (x == TurnstileOutput.AcceptCoin || x == TurnstileOutput.AcceptPush)
{
Console.ForegroundColor = ConsoleColor.Green;
}
else
{
Console.ForegroundColor = ConsoleColor.Red;
}
Console.WriteLine(x);
Console.ResetColor();
}))
{
Console.ReadKey();
Console.WriteLine();
}
}
static async Task Turnstile(IObservable<TurnstileTransition> transitions, IObserver<TurnstileOutput> observer, CancellationToken cancel)
{
var deposits = transitions.Where(t => t == TurnstileTransition.DepositCoin);
var pushes = transitions.Where(t => t == TurnstileTransition.PushArm);
var nextDeposit = deposits.Take(1);
var nextPush = pushes.Take(1);
do
{
using (pushes.Subscribe(_ => observer.OnNext(TurnstileOutput.RejectPush)))
{
await nextDeposit;
}
observer.OnNext(TurnstileOutput.AcceptCoin);
using (deposits.Subscribe(_ => observer.OnNext(TurnstileOutput.RejectCoin)))
{
await nextPush;
}
observer.OnNext(TurnstileOutput.AcceptPush);
}
while (!cancel.IsCancellationRequested);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment