Skip to content

Instantly share code, notes, and snippets.

@flew2bits
Created February 23, 2024 13:27
Show Gist options
  • Save flew2bits/128d637f822de1835b58d0bbe4abc15e to your computer and use it in GitHub Desktop.
Save flew2bits/128d637f822de1835b58d0bbe4abc15e to your computer and use it in GitHub Desktop.
Repro demonstrating possible inline fanout bug
using Marten;
using Marten.Events.Daemon.Resiliency;
using Marten.Events.Projections;
using Marten.Schema;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
var builder = Host.CreateApplicationBuilder();
builder.Services.AddMarten(opts =>
{
opts.Connection("Host=localhost;Port=5432;User Id=postgres;Password=pgsql;Persist Security Info=true");
opts.Projections.Add<FanoutProjection>(ProjectionLifecycle.Async);
})
.UseLightweightSessions()
.AddAsyncDaemon(DaemonMode.Solo);
builder.Services.AddHostedService<App>();
var app = builder.Build();
app.Run();
public record A(Guid Id, string Whatever, B[] Children);
public record B(int ChildId, string Something);
public record C([property: Identity] int ChildId, string Whatever, string Something);
public record SplitEvent(A Parent, B Child);
public class FanoutProjection: MultiStreamProjection<C, int>
{
public FanoutProjection()
{
FanOut<A, SplitEvent>(evt => evt.Children.Select(child => new SplitEvent(evt, child)), FanoutMode.BeforeGrouping);
Identity<SplitEvent>(evt => evt.Child.ChildId);
}
public static C Create(SplitEvent evt) => new(evt.Child.ChildId, evt.Parent.Whatever, evt.Child.Something);
}
public class App(IHostApplicationLifetime hostApplicationLifetime, IDocumentStore store)
: BackgroundService
{
protected override Task ExecuteAsync(CancellationToken stoppingToken)
=> Task.Run(async () =>
{
await using var session = store.LightweightSession();
session.Events.StartStream(new A(Guid.NewGuid(), "Whatever",
[new B(1, "Something 1"), new B(2, "Something 2"), new B(3, "Something 3")]));
await session.SaveChangesAsync(stoppingToken);
// give the async daemon time to finish
await Task.Delay(2000, stoppingToken);
hostApplicationLifetime.StopApplication();
}, stoppingToken);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment