Last active
October 22, 2018 15:56
-
-
Save horsdal/a05f051f20b24b40b0c210fb69120ea6 to your computer and use it in GitHub Desktop.
Marten async event handler prototype
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static class StoreOptionsExtensions | |
{ | |
public static void AddAsyncEventHandlers(this StoreOptions options, Func<Type, IEnumerable<object>> handlerFactory) | |
{ | |
options.Listeners.Add(new EventDispatcher(handlerFactory)); | |
} | |
} | |
public class EventDispatcher : DocumentSessionListenerBase | |
{ | |
private readonly Func<Type, IEnumerable<object>> _handlerFactory; | |
public EventDispatcher(Func<Type, IEnumerable<object>> handlerFactory) | |
{ | |
_handlerFactory = handlerFactory; | |
} | |
public override Task BeforeSaveChangesAsync(IDocumentSession session, CancellationToken token) | |
{ | |
var events = GetEvents(session); | |
return Task.WhenAll(events.SelectMany(group => | |
{ | |
var handlerType = typeof(IAsyncEventHandler<>).MakeGenericType(group.Key); | |
var handlers = _handlerFactory(handlerType) ?? Enumerable.Empty<object>(); | |
return handlers.SelectMany(h => | |
group.Select(ev => | |
(Task) handlerType.InvokeMember("Handle", BindingFlags.InvokeMethod, null, h, new[] {ev, session, token}))); | |
})); | |
} | |
private static IEnumerable<IGrouping<Type, object>> GetEvents(IDocumentSession session) | |
{ | |
var streams = session.PendingChanges.Streams(); | |
return streams.SelectMany(s => s.Events).Select(e => e.Data).GroupBy(e => e.GetType()); | |
} | |
} | |
public interface IAsyncEventHandler<in T> | |
{ | |
Task Handle(T @event, IDocumentSession session, in CancellationToken token); | |
} | |
/// Create event handler | |
public class MyHandler : IAsyncEventHandler<MyEvent> | |
{ | |
public Task Handle(MyEvent @event, IDocumentSession session, in CancellationToken token) | |
{ | |
// Do something async to handle the event | |
return Task.CompletedTask; | |
} | |
} | |
/// Setting up | |
public class Startup | |
{ | |
private IServiceProvider _provider; | |
public void ConfigureServices(IServiceCollection services) | |
{ | |
var docStore = DocumentStore.For(x => | |
{ | |
x.AddAsyncEventHandlers(t => _provider.GetServices(t)); | |
// other config | |
} | |
services.AddTransient<IAsyncEventHandler<MyEvent, MyHandler>(); | |
services.AddSingleton(docStore); | |
_provider = services..BuildServiceProvider(); | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment