Skip to content

Instantly share code, notes, and snippets.

@Emzi0767
Last active August 5, 2019 20:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Emzi0767/5ad015165e60e905165ec0869f9bbb93 to your computer and use it in GitHub Desktop.
Save Emzi0767/5ad015165e60e905165ec0869f9bbb93 to your computer and use it in GitHub Desktop.
Asynchronous Event Queue
internal interface IEventDispatchData<TSender, out TArgs>
where TArgs : AsyncEventArgs
{
Task InvokeAsync();
}
internal sealed class EventDispatchData<TSender, TArgs> : IEventDispatchData<TSender, TArgs>
where TArgs : AsyncEventArgs
{
public AsyncEvent<TSender, TArgs> Event { get; }
public TSender Sender { get; }
public TArgs EventArgs { get; }
internal EventDispatchData(AsyncEvent<TSender, TArgs> asyncEvent, TSender sender, TArgs e)
{
this.Event = asyncEvent;
this.Sender = sender;
this.EventArgs = e;
}
public async Task InvokeAsync()
=> await this.Event.InvokeAsync(this.Sender, this.EventArgs).ConfigureAwait(false);
}
internal static class EventDispatchData
{
public static IEventDispatchData<TSender, TArgs> Create<TSender, TArgs>(AsyncEvent<TSender, TArgs> asyncEvent, TSender sender, TArgs e)
where TArgs : AsyncEventArgs
=> new EventDispatchData<TSender, TArgs>(asyncEvent, sender, e);
public static IEventDispatchData<TSender, TArgs> CreateDispatch<TSender, TArgs>(this AsyncEvent<TSender, TArgs> asyncEvent, TSender sender, TArgs e)
where TArgs : AsyncEventArgs
=> Create(asyncEvent, sender, e);
}
internal sealed class EventQueue<TSender, TEventArgs> : IDisposable
where TEventArgs : AsyncEventArgs
{
private Channel<IEventDispatchData<TSender, TEventArgs>> DispatchQueue { get; }
private ChannelWriter<IEventDispatchData<TSender, TEventArgs>> QueueWriter => this.DispatchQueue.Writer;
private ChannelReader<IEventDispatchData<TSender, TEventArgs>> QueueReader => this.DispatchQueue.Reader;
private CancellationTokenSource TokenSource { get; }
private CancellationToken Token => this.TokenSource.Token;
private Task DispatchTask { get; }
private volatile bool _isDisposed = false;
public EventQueue(int maxSize)
{
this.TokenSource = new CancellationTokenSource();
this.DispatchQueue = Channel.CreateBounded<IEventDispatchData<TSender, TEventArgs>>(new BoundedChannelOptions(maxSize)
{
AllowSynchronousContinuations = false,
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = false
});
this.DispatchTask = this.RunDispatchLoopAsync();
}
public async Task EnqueueAsync(IEventDispatchData<TSender, TEventArgs> eventDispatch)
=> await this.QueueWriter.WriteAsync(eventDispatch, this.Token).ConfigureAwait(false);
public void Dispose()
{
if (this._isDisposed)
return;
this._isDisposed = true;
this.QueueWriter.Complete();
this.TokenSource.Cancel();
this.TokenSource.Dispose();
}
private async Task RunDispatchLoopAsync()
{
while (!this.Token.IsCancellationRequested)
{
var dispatch = await this.QueueReader.ReadAsync(this.Token).ConfigureAwait(false);
await dispatch.InvokeAsync().ConfigureAwait(false);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment