Skip to content

Instantly share code, notes, and snippets.

@fredeil
Created October 6, 2019 10:46
Show Gist options
  • Save fredeil/c54d88c3795593ff0372490925203165 to your computer and use it in GitHub Desktop.
Save fredeil/c54d88c3795593ff0372490925203165 to your computer and use it in GitHub Desktop.

Async IProgress

You have some code using the old IProgress<T> for progress reporting and want to be cool and modern with .NET Core 3.0 features such as IAsyncEnumerable<T> or Channel<T>?

Keywords: IAsyncEnumerable, IProgress, Progress, Channel, ChannelReader, ChannelWriter

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace Test
{
public class AsyncProgress<T> : IProgress<T>
{
private readonly Channel<T> _channel;
public AsyncProgress(ChannelOptions channelOptions = null)
{
if (channelOptions == null)
_channel = Channel.CreateUnbounded<T>(new UnboundedChannelOptions
{
SingleReader = true,
SingleWriter = true,
AllowSynchronousContinuations = true
});
else if (channelOptions is UnboundedChannelOptions uco)
_channel = Channel.CreateUnbounded<T>(uco);
else if (channelOptions is BoundedChannelOptions bco)
_channel = Channel.CreateBounded<T>(bco);
else
throw new ArgumentOutOfRangeException(nameof(channelOptions));
}
public IAsyncEnumerable<T> RunProgressing(Task<T> task, CancellationToken cancellationToken = default)
{
Task.Run(() => task, cancellationToken).ContinueWith(cont =>
{
_channel.Writer.TryComplete(cont?.Exception?.InnerException ?? cont.Exception);
}, cancellationToken).ConfigureAwait(false);
return _channel.Reader.ReadAllAsync(cancellationToken);
}
public IAsyncEnumerable<T> RunProgressing(Task task, CancellationToken cancellationToken = default)
{
Task.Run(() => task, cancellationToken).ContinueWith(cont =>
{
_channel.Writer.TryComplete(cont?.Exception?.InnerException ?? cont.Exception);
}, cancellationToken).ConfigureAwait(false);
return _channel.Reader.ReadAllAsync(cancellationToken);
}
public void Report(T value)
{
_channel.Writer.TryWrite(value);
}
}
public static class LongRunning
{
public static async Task Run(IProgress<int> progress, CancellationToken cancellationToken)
{
for (int i = 1; i <= 10; i++)
{
if (cancellationToken.IsCancellationRequested)
break;
await Task.Delay(100 * i, cancellationToken);
progress.Report(i);
}
}
public static IAsyncEnumerable<int> Run(CancellationToken cancellationToken = default)
{
var progress = new AsyncProgress<int>();
return progress.RunProgressing(Run(progress, cancellationToken));
}
}
class Program
{
static async Task Main()
{
using var cts = new CancellationTokenSource();
await foreach (var state in LongRunning.Run(cts.Token))
{
Console.WriteLine(state);
}
Console.ReadKey();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment