Skip to content

Instantly share code, notes, and snippets.

@jrutley
Last active March 24, 2021 18:28
Show Gist options
  • Save jrutley/8d704eadf30574f9a3828bc8aec8452f to your computer and use it in GitHub Desktop.
Save jrutley/8d704eadf30574f9a3828bc8aec8452f to your computer and use it in GitHub Desktop.
Repro steps for outbox issue
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net5.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="MassTransit" Version="7.1.7" />
</ItemGroup>
</Project>
using MassTransit;
using MassTransit.Saga;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace MassTransitReplyExceptionTest
{
class Program
{
public static async Task Main()
{
var busControl = Bus.Factory.CreateUsingInMemory(conf =>
{
conf.AutoStart = true;
var machine = new SubmitOrderSaga();
var repository = new InMemorySagaRepository<MyState>();
conf.ReceiveEndpoint("saga", e =>
{
e.StateMachineSaga(machine, repository);
});
conf.ReceiveEndpoint("saga-outbox", e =>
{
e.UseInMemoryOutbox();
e.StateMachineSaga(machine, repository);
});
});
var source = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await busControl.StartAsync(source.Token);
try
{
while (true)
{
string value = await Task.Run(() =>
{
Console.WriteLine("-------------------------------------------------\n\n\nPress 1 | 2 for Saga success/fail. Press 3 | 4 for Saga+outbox success/fail. Anything else to exit");
Console.Write("> ");
return Console.ReadLine();
});
if (value.Length == 0) return;
(string serviceAddress, bool? returnSuccessPlease) = value[0] switch
{
'1' => ("saga", true),
'2' => ("saga", false),
'3' => ("saga-outbox", true),
'4' => ("saga-outbox", false),
_ => ((string)null, (bool?)null)
};
if (serviceAddress == null) return;
var requestClient = busControl.CreateRequestClient<SubmitOrder>(new Uri($"queue:{serviceAddress}"), RequestTimeout.After(s:5));
try
{
var orderId = NewId.NextGuid();
Response response = await requestClient.GetResponse<OrderSubmitted>(new
{
OrderId = orderId,
ReturnSuccess = returnSuccessPlease
});
var resultId = response switch
{
(_, OrderSubmitted s) => s.OrderId,
_ => throw new Exception("Unexpected result")
};
Console.WriteLine($"Received a response of {resultId}");
}
catch (RequestFaultException e)
{
Console.WriteLine("FAULT EXCEPTION CAUGHT");
Console.WriteLine(e);
Console.WriteLine(e.InnerException);
}
}
}
finally
{
await busControl.StopAsync();
}
}
}
public interface SubmitOrder
{
public Guid OrderId { get; set; }
public bool ReturnSuccess { get; set; }
}
public interface OrderSubmitted
{
public Guid OrderId { get; set; }
}
}
using Automatonymous;
using System;
namespace MassTransitReplyExceptionTest
{
public class MyState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; internal set; }
}
public class SubmitOrderSaga : MassTransitStateMachine<MyState>
{
public SubmitOrderSaga()
{
Event(() => SubmitOrder, c => c.CorrelateById(x => x.Message.OrderId));
InstanceState(x => x.CurrentState);
Initially(When(SubmitOrder).ThenAsync(async x =>
{
if (!x.Data.ReturnSuccess) throw new Exception("Saga fail");
var ctx = x.CreateConsumeContext();
await ctx.RespondAsync<OrderSubmitted>(new
{
x.Data.OrderId
});
}).TransitionTo(Final));
SetCompletedWhenFinalized();
}
Event<SubmitOrder> SubmitOrder { get; set; }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment