Last active
January 20, 2022 11:21
-
-
Save noseratio/6a56805b4e1c0008152b803b3b50e494 to your computer and use it in GitHub Desktop.
LongRunningAsyncDisposable implements a long-running task that can be stopped by calling IAsyncDisposable.DisposeAsync in a thread-safe, concurrency-friendly way
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// https://stackoverflow.com/q/70718409/1768303 | |
try | |
{ | |
await using var service = new BackgroundService( | |
duration: TimeSpan.FromSeconds(2), | |
catchRuntimeErrorsAtDispose: true); | |
await Task.WhenAll( | |
Enumerable.Range(0, 10).Select(i => | |
Task.Run(async () => | |
{ | |
try | |
{ | |
await Task.Delay(TimeSpan.FromSeconds(1)); | |
await service.DisposeAsync(); | |
await service.Completion; | |
Console.WriteLine($"{i} completed."); | |
} | |
catch (Exception ex) | |
{ | |
Console.WriteLine($"{i} failed: {ex.Message}"); | |
} | |
}))) | |
.WithAggregatedExceptions(); | |
Console.WriteLine("Done."); | |
} | |
catch (Exception ex) | |
{ | |
Console.WriteLine(ex.Message); | |
Console.ReadLine(); | |
} | |
/// <summary> | |
/// A long-running task test service | |
/// </summary> | |
class BackgroundService : IAsyncDisposable | |
{ | |
private readonly LongRunningAsyncDisposable _longRunningAsyncDisposable; | |
public Task Completion => _longRunningAsyncDisposable.Completion; | |
public BackgroundService( | |
TimeSpan duration, | |
bool catchRuntimeErrorsAtDispose) | |
{ | |
_longRunningAsyncDisposable = new( | |
runtimeTaskFunc: token => Run(duration, token), | |
disposeAsyncFunc: () => DisposeAsyncImpl(), | |
catchRuntimeErrorsAtDispose); | |
} | |
private async ValueTask DisposeAsyncImpl() | |
{ | |
// simulate async disposal operation | |
await Task.Delay(TimeSpan.FromSeconds(1)); | |
} | |
private async Task Run(TimeSpan duration, CancellationToken token) | |
{ | |
// similate a long-running task that can be stopped | |
await Task.Delay(duration, token); | |
// this exception can be observed via this.Completion | |
throw new InvalidOperationException("Boo!"); | |
} | |
public ValueTask DisposeAsync() => _longRunningAsyncDisposable.DisposeAsync(); | |
} | |
/// <summary> | |
/// Implement a thread-safe, concurrency-friendly <see cref="IAsyncDisposable.DisposeAsync"/> | |
/// </summary> | |
public class AsyncDisposable : IAsyncDisposable | |
{ | |
private readonly object _syncRoot; // an object to lock upon | |
private Task? _disposalTask; // pending disposal task | |
private readonly Func<ValueTask> _disposeAsyncFunc; // the actual dispose logic | |
public object SyncRoot => _syncRoot; | |
public bool IsDisposing | |
{ | |
get | |
{ | |
lock (_syncRoot) | |
{ | |
return _disposalTask is not null; | |
} | |
} | |
} | |
public AsyncDisposable(Func<ValueTask> disposeAsyncFunc, object? syncRoot = null) | |
{ | |
_syncRoot = syncRoot ?? new(); | |
_disposeAsyncFunc = disposeAsyncFunc; | |
} | |
protected virtual ValueTask DisposeAsyncImpl() | |
{ | |
return _disposeAsyncFunc(); | |
} | |
public ValueTask DisposeAsync() | |
{ | |
Task<Task> disposalTaskWrapper; | |
lock (_syncRoot) | |
{ | |
if (_disposalTask is not null) | |
{ | |
return new(_disposalTask); | |
} | |
disposalTaskWrapper = new(() => DisposeAsyncImpl().AsTask()); | |
_disposalTask = disposalTaskWrapper.Unwrap(); | |
} | |
// run the actual disposal logic outside of the synchronous lock | |
disposalTaskWrapper.RunSynchronously(TaskScheduler.Default); | |
return new(_disposalTask); | |
} | |
} | |
/// <summary> | |
/// Implement a long-running task that can be stopped by calling <see cref="IAsyncDisposable.DisposeAsync" | |
/// <seealso cref="AsyncDisposable"/> | |
/// </summary> | |
public class LongRunningAsyncDisposable : AsyncDisposable | |
{ | |
private readonly Task _runtimeTask; | |
private readonly Task _completionTask; | |
private readonly CancellationTokenSource _cancellationTokenSource; | |
private readonly bool _catchRuntimeErrorsAtDispose; | |
public CancellationToken CancellationToken => _cancellationTokenSource.Token; | |
public Task Completion => _completionTask; | |
public LongRunningAsyncDisposable( | |
Func<CancellationToken, Task> runtimeTaskFunc, | |
Func<ValueTask> disposeAsyncFunc, | |
bool catchRuntimeErrorsAtDispose = true, | |
object? syncRoot = null, | |
CancellationToken cancellationToken = default) : | |
base(disposeAsyncFunc, syncRoot) | |
{ | |
_catchRuntimeErrorsAtDispose = catchRuntimeErrorsAtDispose; | |
_cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); | |
_runtimeTask = runtimeTaskFunc(this.CancellationToken); | |
_completionTask = RunToCompletion(); | |
} | |
private async Task RunToCompletion() | |
{ | |
try | |
{ | |
await _runtimeTask; | |
} | |
finally | |
{ | |
await base.DisposeAsync(); | |
} | |
} | |
protected override async ValueTask DisposeAsyncImpl() | |
{ | |
_cancellationTokenSource.Cancel(); | |
try | |
{ | |
try | |
{ | |
await _runtimeTask; | |
} | |
catch | |
{ | |
if (!_catchRuntimeErrorsAtDispose) throw; | |
// if the error is not rethrown, make sure | |
// to observe the Completion property in the client code | |
} | |
finally | |
{ | |
await base.DisposeAsyncImpl(); | |
} | |
} | |
finally | |
{ | |
_cancellationTokenSource.Dispose(); | |
} | |
} | |
} | |
/// <summary> | |
/// Get all of the AggregateException.InnerExceptions with try/await/catch | |
/// <seealso cref="https://stackoverflow.com/a/62607500/1768303"/> | |
/// </summary> | |
public static partial class TaskExt | |
{ | |
public static Task WithAggregatedExceptions(this Task @this) | |
{ | |
// using AggregateException.Flatten as a bonus | |
return @this.ContinueWith( | |
continuationFunction: anteTask => | |
anteTask.IsFaulted && | |
anteTask.Exception is { } ex && | |
(ex.InnerExceptions.Count > 1 || ex.InnerException is AggregateException) ? | |
Task.FromException(ex.Flatten()) : | |
anteTask, | |
cancellationToken: CancellationToken.None, | |
TaskContinuationOptions.ExecuteSynchronously, | |
scheduler: TaskScheduler.Default).Unwrap(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment