Last active
August 5, 2019 20:42
-
-
Save Emzi0767/5ad015165e60e905165ec0869f9bbb93 to your computer and use it in GitHub Desktop.
Asynchronous Event Queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
internal interface IEventDispatchData<TSender, out TArgs> | |
where TArgs : AsyncEventArgs | |
{ | |
Task InvokeAsync(); | |
} | |
internal sealed class EventDispatchData<TSender, TArgs> : IEventDispatchData<TSender, TArgs> | |
where TArgs : AsyncEventArgs | |
{ | |
public AsyncEvent<TSender, TArgs> Event { get; } | |
public TSender Sender { get; } | |
public TArgs EventArgs { get; } | |
internal EventDispatchData(AsyncEvent<TSender, TArgs> asyncEvent, TSender sender, TArgs e) | |
{ | |
this.Event = asyncEvent; | |
this.Sender = sender; | |
this.EventArgs = e; | |
} | |
public async Task InvokeAsync() | |
=> await this.Event.InvokeAsync(this.Sender, this.EventArgs).ConfigureAwait(false); | |
} | |
internal static class EventDispatchData | |
{ | |
public static IEventDispatchData<TSender, TArgs> Create<TSender, TArgs>(AsyncEvent<TSender, TArgs> asyncEvent, TSender sender, TArgs e) | |
where TArgs : AsyncEventArgs | |
=> new EventDispatchData<TSender, TArgs>(asyncEvent, sender, e); | |
public static IEventDispatchData<TSender, TArgs> CreateDispatch<TSender, TArgs>(this AsyncEvent<TSender, TArgs> asyncEvent, TSender sender, TArgs e) | |
where TArgs : AsyncEventArgs | |
=> Create(asyncEvent, sender, e); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
internal sealed class EventQueue<TSender, TEventArgs> : IDisposable | |
where TEventArgs : AsyncEventArgs | |
{ | |
private Channel<IEventDispatchData<TSender, TEventArgs>> DispatchQueue { get; } | |
private ChannelWriter<IEventDispatchData<TSender, TEventArgs>> QueueWriter => this.DispatchQueue.Writer; | |
private ChannelReader<IEventDispatchData<TSender, TEventArgs>> QueueReader => this.DispatchQueue.Reader; | |
private CancellationTokenSource TokenSource { get; } | |
private CancellationToken Token => this.TokenSource.Token; | |
private Task DispatchTask { get; } | |
private volatile bool _isDisposed = false; | |
public EventQueue(int maxSize) | |
{ | |
this.TokenSource = new CancellationTokenSource(); | |
this.DispatchQueue = Channel.CreateBounded<IEventDispatchData<TSender, TEventArgs>>(new BoundedChannelOptions(maxSize) | |
{ | |
AllowSynchronousContinuations = false, | |
FullMode = BoundedChannelFullMode.Wait, | |
SingleReader = true, | |
SingleWriter = false | |
}); | |
this.DispatchTask = this.RunDispatchLoopAsync(); | |
} | |
public async Task EnqueueAsync(IEventDispatchData<TSender, TEventArgs> eventDispatch) | |
=> await this.QueueWriter.WriteAsync(eventDispatch, this.Token).ConfigureAwait(false); | |
public void Dispose() | |
{ | |
if (this._isDisposed) | |
return; | |
this._isDisposed = true; | |
this.QueueWriter.Complete(); | |
this.TokenSource.Cancel(); | |
this.TokenSource.Dispose(); | |
} | |
private async Task RunDispatchLoopAsync() | |
{ | |
while (!this.Token.IsCancellationRequested) | |
{ | |
var dispatch = await this.QueueReader.ReadAsync(this.Token).ConfigureAwait(false); | |
await dispatch.InvokeAsync().ConfigureAwait(false); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment