Skip to content

Instantly share code, notes, and snippets.

@noseratio
Last active January 20, 2022 11:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save noseratio/6a56805b4e1c0008152b803b3b50e494 to your computer and use it in GitHub Desktop.
Save noseratio/6a56805b4e1c0008152b803b3b50e494 to your computer and use it in GitHub Desktop.
LongRunningAsyncDisposable implements a long-running task that can be stopped by calling IAsyncDisposable.DisposeAsync in a thread-safe, concurrency-friendly way
// https://stackoverflow.com/q/70718409/1768303
try
{
await using var service = new BackgroundService(
duration: TimeSpan.FromSeconds(2),
catchRuntimeErrorsAtDispose: true);
await Task.WhenAll(
Enumerable.Range(0, 10).Select(i =>
Task.Run(async () =>
{
try
{
await Task.Delay(TimeSpan.FromSeconds(1));
await service.DisposeAsync();
await service.Completion;
Console.WriteLine($"{i} completed.");
}
catch (Exception ex)
{
Console.WriteLine($"{i} failed: {ex.Message}");
}
})))
.WithAggregatedExceptions();
Console.WriteLine("Done.");
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
Console.ReadLine();
}
/// <summary>
/// A long-running task test service
/// </summary>
class BackgroundService : IAsyncDisposable
{
private readonly LongRunningAsyncDisposable _longRunningAsyncDisposable;
public Task Completion => _longRunningAsyncDisposable.Completion;
public BackgroundService(
TimeSpan duration,
bool catchRuntimeErrorsAtDispose)
{
_longRunningAsyncDisposable = new(
runtimeTaskFunc: token => Run(duration, token),
disposeAsyncFunc: () => DisposeAsyncImpl(),
catchRuntimeErrorsAtDispose);
}
private async ValueTask DisposeAsyncImpl()
{
// simulate async disposal operation
await Task.Delay(TimeSpan.FromSeconds(1));
}
private async Task Run(TimeSpan duration, CancellationToken token)
{
// similate a long-running task that can be stopped
await Task.Delay(duration, token);
// this exception can be observed via this.Completion
throw new InvalidOperationException("Boo!");
}
public ValueTask DisposeAsync() => _longRunningAsyncDisposable.DisposeAsync();
}
/// <summary>
/// Implement a thread-safe, concurrency-friendly <see cref="IAsyncDisposable.DisposeAsync"/>
/// </summary>
public class AsyncDisposable : IAsyncDisposable
{
private readonly object _syncRoot; // an object to lock upon
private Task? _disposalTask; // pending disposal task
private readonly Func<ValueTask> _disposeAsyncFunc; // the actual dispose logic
public object SyncRoot => _syncRoot;
public bool IsDisposing
{
get
{
lock (_syncRoot)
{
return _disposalTask is not null;
}
}
}
public AsyncDisposable(Func<ValueTask> disposeAsyncFunc, object? syncRoot = null)
{
_syncRoot = syncRoot ?? new();
_disposeAsyncFunc = disposeAsyncFunc;
}
protected virtual ValueTask DisposeAsyncImpl()
{
return _disposeAsyncFunc();
}
public ValueTask DisposeAsync()
{
Task<Task> disposalTaskWrapper;
lock (_syncRoot)
{
if (_disposalTask is not null)
{
return new(_disposalTask);
}
disposalTaskWrapper = new(() => DisposeAsyncImpl().AsTask());
_disposalTask = disposalTaskWrapper.Unwrap();
}
// run the actual disposal logic outside of the synchronous lock
disposalTaskWrapper.RunSynchronously(TaskScheduler.Default);
return new(_disposalTask);
}
}
/// <summary>
/// Implement a long-running task that can be stopped by calling <see cref="IAsyncDisposable.DisposeAsync"
/// <seealso cref="AsyncDisposable"/>
/// </summary>
public class LongRunningAsyncDisposable : AsyncDisposable
{
private readonly Task _runtimeTask;
private readonly Task _completionTask;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly bool _catchRuntimeErrorsAtDispose;
public CancellationToken CancellationToken => _cancellationTokenSource.Token;
public Task Completion => _completionTask;
public LongRunningAsyncDisposable(
Func<CancellationToken, Task> runtimeTaskFunc,
Func<ValueTask> disposeAsyncFunc,
bool catchRuntimeErrorsAtDispose = true,
object? syncRoot = null,
CancellationToken cancellationToken = default) :
base(disposeAsyncFunc, syncRoot)
{
_catchRuntimeErrorsAtDispose = catchRuntimeErrorsAtDispose;
_cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_runtimeTask = runtimeTaskFunc(this.CancellationToken);
_completionTask = RunToCompletion();
}
private async Task RunToCompletion()
{
try
{
await _runtimeTask;
}
finally
{
await base.DisposeAsync();
}
}
protected override async ValueTask DisposeAsyncImpl()
{
_cancellationTokenSource.Cancel();
try
{
try
{
await _runtimeTask;
}
catch
{
if (!_catchRuntimeErrorsAtDispose) throw;
// if the error is not rethrown, make sure
// to observe the Completion property in the client code
}
finally
{
await base.DisposeAsyncImpl();
}
}
finally
{
_cancellationTokenSource.Dispose();
}
}
}
/// <summary>
/// Get all of the AggregateException.InnerExceptions with try/await/catch
/// <seealso cref="https://stackoverflow.com/a/62607500/1768303"/>
/// </summary>
public static partial class TaskExt
{
public static Task WithAggregatedExceptions(this Task @this)
{
// using AggregateException.Flatten as a bonus
return @this.ContinueWith(
continuationFunction: anteTask =>
anteTask.IsFaulted &&
anteTask.Exception is { } ex &&
(ex.InnerExceptions.Count > 1 || ex.InnerException is AggregateException) ?
Task.FromException(ex.Flatten()) :
anteTask,
cancellationToken: CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
scheduler: TaskScheduler.Default).Unwrap();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment