Skip to content

Instantly share code, notes, and snippets.

@phatboyg
Created January 1, 2016 22:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save phatboyg/53c4e173eba91f7f7825 to your computer and use it in GitHub Desktop.
Save phatboyg/53c4e173eba91f7f7825 to your computer and use it in GitHub Desktop.
namespace SagaLockupRepro
{
using System;
using System.Threading;
using Automatonymous;
using MassTransit;
using MassTransit.NLogIntegration;
using MassTransit.RabbitMqTransport;
using MassTransit.Saga;
internal class Program
{
private static void Main()
{
var rand = new Random();
IBusControl sagaBus = Bus.Factory.CreateUsingRabbitMq(c =>
{
IRabbitMqHost host = c.Host(new Uri("rabbitmq://localhost"), hc => { });
c.UseNLog();
c.UseInMemoryScheduler();
c.ReceiveEndpoint(host, "saga_endpoint", h =>
{
h.PurgeOnStartup = true;
h.StateMachineSaga(new Saga(), new InMemorySagaRepository<SagaState>());
});
});
IBusControl bus = Bus.Factory.CreateUsingRabbitMq(c =>
{
IRabbitMqHost host = c.Host(new Uri("rabbitmq://localhost"), hc => { });
c.UseNLog();
c.ReceiveEndpoint(host, "server_endpoint", h =>
{
h.PurgeOnStartup = true;
h.Handler<SomeMessage2>(async m =>
{
Thread.Sleep(rand.Next(200));
m.Respond(new SomeResponse2(m.Message.CorrelationId));
});
h.Handler<SomeMessage3>(async m =>
{
Thread.Sleep(rand.Next(200));
m.Respond(new SomeResponse3(m.Message.CorrelationId));
});
});
});
sagaBus.Start();
try
{
bus.Start();
try
{
for (var i = 0; i < 1000; i++)
{
sagaBus.Publish(new SomeMessage1(Guid.NewGuid(), i));
Thread.Sleep(15);
}
Console.WriteLine("Published");
Console.ReadLine();
}
finally
{
bus.Stop();
}
}
finally
{
sagaBus.Stop();
}
}
}
public class SagaState :
SagaStateMachineInstance
{
private Guid _correlationId;
public SagaState(Guid correlationId)
{
_correlationId = correlationId;
}
public Guid? TimeoutToken { get; set; }
public State CurrentState { get; set; }
public int Index { get; set; }
public Guid CorrelationId
{
get { return _correlationId; }
set { _correlationId = value; }
}
}
public sealed class Saga :
MassTransitStateMachine<SagaState>
{
public Saga()
{
State(() => WaitingForSecondMessage);
State(() => WaitingForThirdMessage);
Event(() => OnFirstMessage);
Event(() => OnSecondMessage);
Event(() => OnThirdMessage);
Schedule(() => ScheduleTimeout, x => x.TimeoutToken, x => { x.Delay = TimeSpan.FromSeconds(60); });
Initially(
When(OnFirstMessage)
.Then(context => context.Instance.Index = context.Data.Index)
.TransitionTo(WaitingForSecondMessage)
.Publish(s => new SomeMessage2(s.Instance.CorrelationId)));
During(WaitingForSecondMessage,
When(OnSecondMessage)
.TransitionTo(WaitingForThirdMessage)
.Schedule(ScheduleTimeout, c => new Timeout(c.Instance.CorrelationId))
.Publish(s => new SomeMessage3(s.Instance.CorrelationId)));
During(WaitingForThirdMessage,
When(ScheduleTimeout.Received)
.Then(x => { Console.WriteLine($"Timed out: {x.Instance.Index}"); }).Finalize());
During(WaitingForThirdMessage,
When(OnThirdMessage)
.Unschedule(ScheduleTimeout)
.Finalize());
DuringAny(
When(Final.Enter)
.Then(x => { Console.WriteLine($"Done: {x.Instance.Index}"); }));
SetCompletedWhenFinalized();
}
public State WaitingForSecondMessage { get; set; }
public State WaitingForThirdMessage { get; set; }
public Event<SomeMessage1> OnFirstMessage { get; set; }
public Event<SomeResponse2> OnSecondMessage { get; set; }
public Event<SomeResponse3> OnThirdMessage { get; set; }
public Schedule<SagaState, Timeout> ScheduleTimeout { get; set; }
}
public class SomeMessage1 : CorrelatedBy<Guid>
{
private Guid _correlationId;
private int _index;
public SomeMessage1(Guid correlationId, int index)
{
_index = index;
_correlationId = correlationId;
}
public int Index
{
get { return _index; }
}
public Guid CorrelationId
{
get { return _correlationId; }
}
}
public class SomeMessage2 : CorrelatedBy<Guid>
{
private Guid _correlationId;
public SomeMessage2(Guid correlationId)
{
_correlationId = correlationId;
}
public Guid CorrelationId
{
get { return _correlationId; }
}
}
public class SomeResponse2 : CorrelatedBy<Guid>
{
private Guid _correlationId;
public SomeResponse2(Guid correlationId)
{
_correlationId = correlationId;
}
public Guid CorrelationId
{
get { return _correlationId; }
}
}
public class SomeMessage3 : CorrelatedBy<Guid>
{
private Guid _correlationId;
public SomeMessage3(Guid correlationId)
{
_correlationId = correlationId;
}
public Guid CorrelationId
{
get { return _correlationId; }
}
}
public class SomeResponse3 : CorrelatedBy<Guid>
{
private Guid _correlationId;
public SomeResponse3(Guid correlationId)
{
_correlationId = correlationId;
}
public Guid CorrelationId
{
get { return _correlationId; }
}
}
public class Timeout : CorrelatedBy<Guid>
{
private Guid _correlationId;
public Timeout(Guid correlationId)
{
_correlationId = correlationId;
}
public Guid CorrelationId
{
get { return _correlationId; }
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment