Skip to content

Instantly share code, notes, and snippets.

@horsdal
Last active October 22, 2018 15:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save horsdal/a05f051f20b24b40b0c210fb69120ea6 to your computer and use it in GitHub Desktop.
Save horsdal/a05f051f20b24b40b0c210fb69120ea6 to your computer and use it in GitHub Desktop.
Marten async event handler prototype
public static class StoreOptionsExtensions
{
public static void AddAsyncEventHandlers(this StoreOptions options, Func<Type, IEnumerable<object>> handlerFactory)
{
options.Listeners.Add(new EventDispatcher(handlerFactory));
}
}
public class EventDispatcher : DocumentSessionListenerBase
{
private readonly Func<Type, IEnumerable<object>> _handlerFactory;
public EventDispatcher(Func<Type, IEnumerable<object>> handlerFactory)
{
_handlerFactory = handlerFactory;
}
public override Task BeforeSaveChangesAsync(IDocumentSession session, CancellationToken token)
{
var events = GetEvents(session);
return Task.WhenAll(events.SelectMany(group =>
{
var handlerType = typeof(IAsyncEventHandler<>).MakeGenericType(group.Key);
var handlers = _handlerFactory(handlerType) ?? Enumerable.Empty<object>();
return handlers.SelectMany(h =>
group.Select(ev =>
(Task) handlerType.InvokeMember("Handle", BindingFlags.InvokeMethod, null, h, new[] {ev, session, token})));
}));
}
private static IEnumerable<IGrouping<Type, object>> GetEvents(IDocumentSession session)
{
var streams = session.PendingChanges.Streams();
return streams.SelectMany(s => s.Events).Select(e => e.Data).GroupBy(e => e.GetType());
}
}
public interface IAsyncEventHandler<in T>
{
Task Handle(T @event, IDocumentSession session, in CancellationToken token);
}
/// Create event handler
public class MyHandler : IAsyncEventHandler<MyEvent>
{
public Task Handle(MyEvent @event, IDocumentSession session, in CancellationToken token)
{
// Do something async to handle the event
return Task.CompletedTask;
}
}
/// Setting up
public class Startup
{
private IServiceProvider _provider;
public void ConfigureServices(IServiceCollection services)
{
var docStore = DocumentStore.For(x =>
{
x.AddAsyncEventHandlers(t => _provider.GetServices(t));
// other config
}
services.AddTransient<IAsyncEventHandler<MyEvent, MyHandler>();
services.AddSingleton(docStore);
_provider = services..BuildServiceProvider();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment