Created
September 1, 2012 01:52
-
-
Save clemensv/3562597 to your computer and use it in GitHub Desktop.
Saga
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace Sagas | |
{ | |
using System; | |
using System.Collections.Generic; | |
class Program | |
{ | |
static ActivityHost[] processes; | |
static void Main(string[] args) | |
{ | |
var routingSlip = new RoutingSlip(new WorkItem[] | |
{ | |
new WorkItem<ReserveCarActivity>(new WorkItemArguments{{"vehicleType", "Compact"}}), | |
new WorkItem<ReserveHotelActivity>(new WorkItemArguments{{"roomType", "Suite"}}), | |
new WorkItem<ReserveFlightActivity>(new WorkItemArguments{{"destination", "DUS"}}) | |
}); | |
// imagine these being completely separate processes with queues between them | |
processes = new ActivityHost[] | |
{ | |
new ActivityHost<ReserveCarActivity>(Send), | |
new ActivityHost<ReserveHotelActivity>(Send), | |
new ActivityHost<ReserveFlightActivity>(Send) | |
}; | |
// hand off to the first address | |
Send(routingSlip.ProgressUri, routingSlip); | |
} | |
static void Send(Uri uri, RoutingSlip routingSlip) | |
{ | |
// this is effectively the network dispatch | |
foreach (var process in processes) | |
{ | |
if (process.AcceptMessage(uri, routingSlip)) | |
{ | |
break; | |
} | |
} | |
} | |
} | |
class ReserveCarActivity : Activity | |
{ | |
static Random rnd = new Random(2); | |
public override WorkLog DoWork(WorkItem workItem) | |
{ | |
Console.WriteLine("Reserving car"); | |
var car = workItem.Arguments["vehicleType"]; | |
var reservationId = rnd.Next(100000); | |
Console.WriteLine("Reserved car {0}", reservationId); | |
return new WorkLog(this, new WorkResult { { "reservationId", reservationId } }); | |
} | |
public override bool Compensate(WorkLog item, RoutingSlip routingSlip) | |
{ | |
var reservationId = item.Result["reservationId"]; | |
Console.WriteLine("Cancelled car {0}", reservationId); | |
return true; | |
} | |
public override Uri WorkItemQueueAddress | |
{ | |
get { return new Uri("sb://./carReservations"); } | |
} | |
public override Uri CompensationQueueAddress | |
{ | |
get { return new Uri("sb://./carCancellactions"); } | |
} | |
} | |
class ReserveHotelActivity : Activity | |
{ | |
static Random rnd = new Random(1); | |
public override WorkLog DoWork(WorkItem workItem) | |
{ | |
Console.WriteLine("Reserving hotel"); | |
var car = workItem.Arguments["roomType"]; | |
var reservationId = rnd.Next(100000); | |
Console.WriteLine("Reserved hotel {0}", reservationId); | |
return new WorkLog(this, new WorkResult { { "reservationId", reservationId } }); | |
} | |
public override bool Compensate(WorkLog item, RoutingSlip routingSlip) | |
{ | |
var reservationId = item.Result["reservationId"]; | |
Console.WriteLine("Cancelled hotel {0}", reservationId); | |
return true; | |
} | |
public override Uri WorkItemQueueAddress | |
{ | |
get { return new Uri("sb://./hotelReservations"); } | |
} | |
public override Uri CompensationQueueAddress | |
{ | |
get { return new Uri("sb://./hotelCancellations"); } | |
} | |
} | |
class ReserveFlightActivity : Activity | |
{ | |
static Random rnd = new Random(3); | |
public override WorkLog DoWork(WorkItem workItem) | |
{ | |
Console.WriteLine("Reserving flight"); | |
var car = workItem.Arguments["fatzbatz"]; // this throws | |
var reservationId = rnd.Next(100000); | |
Console.WriteLine("Reserved flight {0}", reservationId); | |
return new WorkLog(this, new WorkResult { { "reservationId", reservationId } }); | |
} | |
public override bool Compensate(WorkLog item, RoutingSlip routingSlip) | |
{ | |
var reservationId = item.Result["reservationId"]; | |
Console.WriteLine("Cancelled flight {0}", reservationId); | |
return true; | |
} | |
public override Uri WorkItemQueueAddress | |
{ | |
get { return new Uri("sb://./flightReservations"); } | |
} | |
public override Uri CompensationQueueAddress | |
{ | |
get { return new Uri("sb://./flightCancellations"); } | |
} | |
} | |
abstract class Activity | |
{ | |
public abstract WorkLog DoWork(WorkItem item); | |
public abstract bool Compensate(WorkLog item, RoutingSlip routingSlip); | |
public abstract Uri WorkItemQueueAddress { get; } | |
public abstract Uri CompensationQueueAddress { get; } | |
} | |
class WorkLog | |
{ | |
readonly Type activityType; | |
readonly WorkResult result; | |
public WorkLog(Activity activity, WorkResult result) | |
{ | |
this.result = result; | |
this.activityType = activity.GetType(); | |
} | |
public WorkResult Result | |
{ | |
get { return this.result; } | |
} | |
public Type ActivityType | |
{ | |
get { return this.activityType; } | |
} | |
} | |
class WorkItemArguments : Dictionary<string, object> | |
{ | |
} | |
class WorkResult : Dictionary<string, object> | |
{ | |
} | |
abstract class WorkItem | |
{ | |
protected WorkItem(WorkItemArguments arguments) | |
{ | |
this.Arguments = arguments; | |
} | |
public RoutingSlip RoutingSlip { get; set; } | |
public WorkItemArguments Arguments { get; set; } | |
public abstract Type ActivityType { get; } | |
} | |
class WorkItem<T> : WorkItem where T : Activity | |
{ | |
public WorkItem(WorkItemArguments args) : base(args) | |
{ | |
} | |
public override Type ActivityType | |
{ | |
get { return typeof (T); } | |
} | |
} | |
class RoutingSlip | |
{ | |
readonly Stack<WorkLog> completedWorkLogs = new Stack<WorkLog>(); | |
readonly Queue<WorkItem> nextWorkItem = new Queue<WorkItem>(); | |
public RoutingSlip() | |
{ | |
} | |
public RoutingSlip(IEnumerable<WorkItem> workItems) | |
{ | |
foreach (var workItem in workItems) | |
{ | |
this.nextWorkItem.Enqueue(workItem); | |
} | |
} | |
public bool IsCompleted | |
{ | |
get { return this.nextWorkItem.Count == 0; } | |
} | |
public bool IsInProgress | |
{ | |
get { return this.completedWorkLogs.Count > 0; } | |
} | |
public bool ProcessNext() | |
{ | |
if (this.IsCompleted) | |
{ | |
throw new InvalidOperationException(); | |
} | |
var currentItem = this.nextWorkItem.Dequeue(); | |
var activity = (Activity) Activator.CreateInstance(currentItem.ActivityType); | |
try | |
{ | |
var result = activity.DoWork(currentItem); | |
if (result != null) | |
{ | |
this.completedWorkLogs.Push(result); | |
return true; | |
} | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine("Exception {0}", e.Message); | |
} | |
return false; | |
} | |
public Uri ProgressUri | |
{ | |
get | |
{ | |
if (IsCompleted) | |
{ | |
return null; | |
} | |
else | |
{ | |
return | |
((Activity) Activator.CreateInstance(this.nextWorkItem.Peek().ActivityType)). | |
WorkItemQueueAddress; | |
} | |
} | |
} | |
public Uri CompensationUri | |
{ | |
get | |
{ | |
if (!IsInProgress) | |
{ | |
return null; | |
} | |
else | |
{ | |
return | |
((Activity) Activator.CreateInstance(this.completedWorkLogs.Peek().ActivityType)). | |
CompensationQueueAddress; | |
} | |
} | |
} | |
public bool UndoLast() | |
{ | |
if (!this.IsInProgress) | |
{ | |
throw new InvalidOperationException(); | |
} | |
var currentItem = this.completedWorkLogs.Pop(); | |
var activity = (Activity) Activator.CreateInstance(currentItem.ActivityType); | |
try | |
{ | |
return activity.Compensate(currentItem, this); | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine("Exception {0}", e.Message); | |
throw; | |
} | |
} | |
} | |
abstract class ActivityHost | |
{ | |
Action<Uri, RoutingSlip> send; | |
public ActivityHost(Action<Uri, RoutingSlip> send) | |
{ | |
this.send = send; | |
} | |
public void ProcessForwardMessage(RoutingSlip routingSlip) | |
{ | |
if (!routingSlip.IsCompleted) | |
{ | |
// if the current step is successful, proceed | |
// otherwise go to the Unwind path | |
if (routingSlip.ProcessNext()) | |
{ | |
// recursion stands for passing context via message | |
// the routing slip can be fully serialized and passed | |
// between systems. | |
this.send(routingSlip.ProgressUri, routingSlip); | |
} | |
else | |
{ | |
// pass message to unwind message route | |
this.send(routingSlip.CompensationUri, routingSlip); | |
} | |
} | |
} | |
public void ProcessBackwardMessage(RoutingSlip routingSlip) | |
{ | |
if (routingSlip.IsInProgress) | |
{ | |
// UndoLast can put new work on the routing slip | |
// and return false to go back on the forward | |
// path | |
if (routingSlip.UndoLast()) | |
{ | |
// recursion stands for passing context via message | |
// the routing slip can be fully serialized and passed | |
// between systems | |
this.send(routingSlip.CompensationUri, routingSlip); | |
} | |
else | |
{ | |
this.send(routingSlip.ProgressUri, routingSlip); | |
} | |
} | |
} | |
public abstract bool AcceptMessage(Uri uri, RoutingSlip routingSlip); | |
} | |
class ActivityHost<T> : ActivityHost where T : Activity, new() | |
{ | |
public ActivityHost(Action<Uri, RoutingSlip> send) | |
: base(send) | |
{ | |
} | |
public override bool AcceptMessage(Uri uri, RoutingSlip routingSlip) | |
{ | |
var activity = new T(); | |
if (activity.CompensationQueueAddress.Equals(uri)) | |
{ | |
this.ProcessBackwardMessage(routingSlip); | |
return true; | |
} | |
if (activity.WorkItemQueueAddress.Equals(uri)) | |
{ | |
this.ProcessForwardMessage(routingSlip); | |
return true; | |
} | |
return false; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment