Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
using System;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.Persistence.Sql;
namespace SqlPersistenceSagaConcurrencyTest
{
class Program
{
static void Main(string[] args)
{
Start().GetAwaiter().GetResult();
}
static async Task Start()
{
var config = new EndpointConfiguration("ConcurrencyTest");
config.SendFailedMessagesTo("error");
config.UsePersistence<SqlPersistence>().ConnectionBuilder(() => new SqlConnection(@"Data Source=.\SQLEXPRESS;Initial Catalog=nservicebus;Integrated Security=True"));
config.LimitMessageProcessingConcurrencyTo(1);
var endpoint = await Endpoint.Start(config);
while (true)
{
Console.WriteLine("Type id to init saga");
var id = Console.ReadLine();
for (var i = 0; i < 4; i++)
{
await endpoint.SendLocal(new MyMessage
{
Data = i.ToString(),
UniqueId = id
});
}
}
}
}
public class MyMessage : IMessage
{
public string UniqueId { get; set; }
public string Data { get; set; }
}
public class AggregateMessage : IMessage
{
public string Data { get; set; }
}
class AggregateMessageHandler : IHandleMessages<AggregateMessage>
{
public Task Handle(AggregateMessage message, IMessageHandlerContext context)
{
Console.WriteLine(message.Data);
return Task.FromResult(0);
}
}
[SqlSaga("UniqueId")]
public class AggregatingSaga : SqlSaga<AggregatingSagaData>,
IAmStartedByMessages<MyMessage>
{
protected override void ConfigureMapping(MessagePropertyMapper<AggregatingSagaData> mapper)
{
mapper.MapMessage<MyMessage>(m => m.UniqueId);
}
public async Task Handle(MyMessage message, IMessageHandlerContext context)
{
Data.MessageCount++;
Data.Data += message.Data;
if (Data.MessageCount >= 3)
{
await context.SendLocal(new AggregateMessage() {Data = Data.Data});
MarkAsComplete();
}
}
}
public class AggregatingSagaData : ContainSagaData
{
public string UniqueId { get; set; }
public string Data { get; set; }
public int MessageCount { get; set; }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment