Skip to content

Instantly share code, notes, and snippets.

@programmation
Created December 24, 2019 03:54
Show Gist options
  • Save programmation/ea04466f81790478d5a4e9efc1c54fe7 to your computer and use it in GitHub Desktop.
Save programmation/ea04466f81790478d5a4e9efc1c54fe7 to your computer and use it in GitHub Desktop.
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Nito.AsyncEx;
namespace Core.Utility
{
public abstract class BaseOperation<TInput, TOutput>
{
public TaskCompletionSource<TOutput> TaskCompletionSource { get; }
= new TaskCompletionSource<TOutput>();
}
public abstract class BaseReaderWriterOperation<TInput, TOutput>
: BaseOperation<TInput, TOutput>
{
public abstract TOutput Execute();
}
public abstract class ReaderOperation<TInput, TOutput>
: BaseReaderWriterOperation<TInput, TOutput>
{
}
public abstract class WriterOperation<TInput, TOutput>
: BaseReaderWriterOperation<TInput, TOutput>
{
}
public abstract class BaseAsyncReaderWriterOperation<TInput, TOutput>
: BaseOperation<TInput, TOutput>
{
public abstract Task<TOutput> ExecuteAsync();
}
public abstract class AsyncReaderOperation<TInput, TOutput>
: BaseAsyncReaderWriterOperation<TInput, TOutput>
{
}
public abstract class AsyncWriterOperation<TInput, TOutput>
: BaseAsyncReaderWriterOperation<TInput, TOutput>
{
}
public class ReaderWriterOperationQueue<TInput, TOutput>
{
private AsyncReaderWriterLock _readerWriterLock;
private readonly TransformBlock<ReaderOperation<TInput, TOutput>, Task<TOutput>> _readerQueue;
private readonly TransformBlock<WriterOperation<TInput, TOutput>, Task<TOutput>> _writerQueue;
private readonly TransformBlock<AsyncReaderOperation<TInput, TOutput>, Task<TOutput>> _asyncReaderQueue;
private readonly TransformBlock<AsyncWriterOperation<TInput, TOutput>, Task<TOutput>> _asyncWriterQueue;
public ReaderWriterOperationQueue()
{
_readerWriterLock = new AsyncReaderWriterLock();
_readerQueue = new TransformBlock<ReaderOperation<TInput, TOutput>, Task<TOutput>>(DoReaderOperationAsync);
_writerQueue = new TransformBlock<WriterOperation<TInput, TOutput>, Task<TOutput>>(DoWriterOperationAsync);
_asyncReaderQueue = new TransformBlock<AsyncReaderOperation<TInput, TOutput>, Task<TOutput>>(DoAsyncReaderOperationAsync);
_asyncWriterQueue = new TransformBlock<AsyncWriterOperation<TInput, TOutput>, Task<TOutput>>(DoAsyncWriterOperationAsync);
}
public async Task<TOutput> ReadAsync(ReaderOperation<TInput, TOutput> operation)
{
if (await _readerQueue.SendAsync(operation))
{
return await operation.TaskCompletionSource.Task;
}
else
{
throw new InvalidOperationException("Unable to queue read operation");
}
}
public async Task<TOutput> WriteAsync(WriterOperation<TInput, TOutput> operation)
{
if (await _writerQueue.SendAsync(operation))
{
return await operation.TaskCompletionSource.Task;
}
else
{
throw new InvalidOperationException("Unable to queue write operation");
}
}
public async Task<TOutput> ReadAsync(AsyncReaderOperation<TInput, TOutput> operation)
{
if (await _asyncReaderQueue.SendAsync(operation))
{
return await operation.TaskCompletionSource.Task;
}
else
{
throw new InvalidOperationException("Unable to queue async read operation");
}
}
public async Task<TOutput> WriteAsync(AsyncWriterOperation<TInput, TOutput> operation)
{
if (await _asyncWriterQueue.SendAsync(operation))
{
return await operation.TaskCompletionSource.Task;
}
else
{
throw new InvalidOperationException("Unable to queue async write operation");
}
}
private async Task<TOutput> DoReaderOperationAsync(ReaderOperation<TInput, TOutput> operation)
{
try
{
using (await _readerWriterLock.ReaderLockAsync())
{
var result = operation.Execute();
operation.TaskCompletionSource.SetResult(result);
}
}
catch (Exception ex)
{
operation.TaskCompletionSource.SetException(ex);
}
return await operation.TaskCompletionSource.Task;
}
private async Task<TOutput> DoWriterOperationAsync(WriterOperation<TInput, TOutput> operation)
{
try
{
using (await _readerWriterLock.WriterLockAsync())
{
var result = operation.Execute();
operation.TaskCompletionSource.SetResult(result);
}
}
catch (Exception ex)
{
operation.TaskCompletionSource.SetException(ex);
}
return await operation.TaskCompletionSource.Task;
}
private async Task<TOutput> DoAsyncReaderOperationAsync(AsyncReaderOperation<TInput, TOutput> operation)
{
try
{
using (await _readerWriterLock.ReaderLockAsync())
{
var result = await operation.ExecuteAsync();
operation.TaskCompletionSource.SetResult(result);
}
}
catch (Exception ex)
{
operation.TaskCompletionSource.SetException(ex);
}
return await operation.TaskCompletionSource.Task;
}
private async Task<TOutput> DoAsyncWriterOperationAsync(AsyncWriterOperation<TInput, TOutput> operation)
{
try
{
using (await _readerWriterLock.WriterLockAsync())
{
var result = await operation.ExecuteAsync();
operation.TaskCompletionSource.SetResult(result);
}
}
catch (Exception ex)
{
operation.TaskCompletionSource.SetException(ex);
}
return await operation.TaskCompletionSource.Task;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment