Skip to content

Instantly share code, notes, and snippets.

Forked from clemensv/gist:3562597
Created November 19, 2017 21:49
Show Gist options
  • Save DobrinGanev/a4bca56e50cdf1002f250af4dfe670b5 to your computer and use it in GitHub Desktop.
Save DobrinGanev/a4bca56e50cdf1002f250af4dfe670b5 to your computer and use it in GitHub Desktop.
namespace Sagas
using System;
using System.Collections.Generic;
class Program
static ActivityHost[] processes;
static void Main(string[] args)
var routingSlip = new RoutingSlip(new WorkItem[]
new WorkItem<ReserveCarActivity>(new WorkItemArguments{{"vehicleType", "Compact"}}),
new WorkItem<ReserveHotelActivity>(new WorkItemArguments{{"roomType", "Suite"}}),
new WorkItem<ReserveFlightActivity>(new WorkItemArguments{{"destination", "DUS"}})
// imagine these being completely separate processes with queues between them
processes = new ActivityHost[]
new ActivityHost<ReserveCarActivity>(Send),
new ActivityHost<ReserveHotelActivity>(Send),
new ActivityHost<ReserveFlightActivity>(Send)
// hand off to the first address
Send(routingSlip.ProgressUri, routingSlip);
static void Send(Uri uri, RoutingSlip routingSlip)
// this is effectively the network dispatch
foreach (var process in processes)
if (process.AcceptMessage(uri, routingSlip))
class ReserveCarActivity : Activity
static Random rnd = new Random(2);
public override WorkLog DoWork(WorkItem workItem)
Console.WriteLine("Reserving car");
var car = workItem.Arguments["vehicleType"];
var reservationId = rnd.Next(100000);
Console.WriteLine("Reserved car {0}", reservationId);
return new WorkLog(this, new WorkResult { { "reservationId", reservationId } });
public override bool Compensate(WorkLog item, RoutingSlip routingSlip)
var reservationId = item.Result["reservationId"];
Console.WriteLine("Cancelled car {0}", reservationId);
return true;
public override Uri WorkItemQueueAddress
get { return new Uri("sb://./carReservations"); }
public override Uri CompensationQueueAddress
get { return new Uri("sb://./carCancellactions"); }
class ReserveHotelActivity : Activity
static Random rnd = new Random(1);
public override WorkLog DoWork(WorkItem workItem)
Console.WriteLine("Reserving hotel");
var car = workItem.Arguments["roomType"];
var reservationId = rnd.Next(100000);
Console.WriteLine("Reserved hotel {0}", reservationId);
return new WorkLog(this, new WorkResult { { "reservationId", reservationId } });
public override bool Compensate(WorkLog item, RoutingSlip routingSlip)
var reservationId = item.Result["reservationId"];
Console.WriteLine("Cancelled hotel {0}", reservationId);
return true;
public override Uri WorkItemQueueAddress
get { return new Uri("sb://./hotelReservations"); }
public override Uri CompensationQueueAddress
get { return new Uri("sb://./hotelCancellations"); }
class ReserveFlightActivity : Activity
static Random rnd = new Random(3);
public override WorkLog DoWork(WorkItem workItem)
Console.WriteLine("Reserving flight");
var car = workItem.Arguments["fatzbatz"]; // this throws
var reservationId = rnd.Next(100000);
Console.WriteLine("Reserved flight {0}", reservationId);
return new WorkLog(this, new WorkResult { { "reservationId", reservationId } });
public override bool Compensate(WorkLog item, RoutingSlip routingSlip)
var reservationId = item.Result["reservationId"];
Console.WriteLine("Cancelled flight {0}", reservationId);
return true;
public override Uri WorkItemQueueAddress
get { return new Uri("sb://./flightReservations"); }
public override Uri CompensationQueueAddress
get { return new Uri("sb://./flightCancellations"); }
abstract class Activity
public abstract WorkLog DoWork(WorkItem item);
public abstract bool Compensate(WorkLog item, RoutingSlip routingSlip);
public abstract Uri WorkItemQueueAddress { get; }
public abstract Uri CompensationQueueAddress { get; }
class WorkLog
readonly Type activityType;
readonly WorkResult result;
public WorkLog(Activity activity, WorkResult result)
this.result = result;
this.activityType = activity.GetType();
public WorkResult Result
get { return this.result; }
public Type ActivityType
get { return this.activityType; }
class WorkItemArguments : Dictionary<string, object>
class WorkResult : Dictionary<string, object>
abstract class WorkItem
protected WorkItem(WorkItemArguments arguments)
this.Arguments = arguments;
public RoutingSlip RoutingSlip { get; set; }
public WorkItemArguments Arguments { get; set; }
public abstract Type ActivityType { get; }
class WorkItem<T> : WorkItem where T : Activity
public WorkItem(WorkItemArguments args) : base(args)
public override Type ActivityType
get { return typeof (T); }
class RoutingSlip
readonly Stack<WorkLog> completedWorkLogs = new Stack<WorkLog>();
readonly Queue<WorkItem> nextWorkItem = new Queue<WorkItem>();
public RoutingSlip()
public RoutingSlip(IEnumerable<WorkItem> workItems)
foreach (var workItem in workItems)
public bool IsCompleted
get { return this.nextWorkItem.Count == 0; }
public bool IsInProgress
get { return this.completedWorkLogs.Count > 0; }
public bool ProcessNext()
if (this.IsCompleted)
throw new InvalidOperationException();
var currentItem = this.nextWorkItem.Dequeue();
var activity = (Activity) Activator.CreateInstance(currentItem.ActivityType);
var result = activity.DoWork(currentItem);
if (result != null)
return true;
catch (Exception e)
Console.WriteLine("Exception {0}", e.Message);
return false;
public Uri ProgressUri
if (IsCompleted)
return null;
((Activity) Activator.CreateInstance(this.nextWorkItem.Peek().ActivityType)).
public Uri CompensationUri
if (!IsInProgress)
return null;
((Activity) Activator.CreateInstance(this.completedWorkLogs.Peek().ActivityType)).
public bool UndoLast()
if (!this.IsInProgress)
throw new InvalidOperationException();
var currentItem = this.completedWorkLogs.Pop();
var activity = (Activity) Activator.CreateInstance(currentItem.ActivityType);
return activity.Compensate(currentItem, this);
catch (Exception e)
Console.WriteLine("Exception {0}", e.Message);
abstract class ActivityHost
Action<Uri, RoutingSlip> send;
public ActivityHost(Action<Uri, RoutingSlip> send)
this.send = send;
public void ProcessForwardMessage(RoutingSlip routingSlip)
if (!routingSlip.IsCompleted)
// if the current step is successful, proceed
// otherwise go to the Unwind path
if (routingSlip.ProcessNext())
// recursion stands for passing context via message
// the routing slip can be fully serialized and passed
// between systems.
this.send(routingSlip.ProgressUri, routingSlip);
// pass message to unwind message route
this.send(routingSlip.CompensationUri, routingSlip);
public void ProcessBackwardMessage(RoutingSlip routingSlip)
if (routingSlip.IsInProgress)
// UndoLast can put new work on the routing slip
// and return false to go back on the forward
// path
if (routingSlip.UndoLast())
// recursion stands for passing context via message
// the routing slip can be fully serialized and passed
// between systems
this.send(routingSlip.CompensationUri, routingSlip);
this.send(routingSlip.ProgressUri, routingSlip);
public abstract bool AcceptMessage(Uri uri, RoutingSlip routingSlip);
class ActivityHost<T> : ActivityHost where T : Activity, new()
public ActivityHost(Action<Uri, RoutingSlip> send)
: base(send)
public override bool AcceptMessage(Uri uri, RoutingSlip routingSlip)
var activity = new T();
if (activity.CompensationQueueAddress.Equals(uri))
return true;
if (activity.WorkItemQueueAddress.Equals(uri))
return true;
return false;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment