Skip to content

Instantly share code, notes, and snippets.

@AdamDotNet
Last active February 22, 2020 17:07
Show Gist options
  • Save AdamDotNet/e0ee211d052275d57750a8cac225fbec to your computer and use it in GitHub Desktop.
Save AdamDotNet/e0ee211d052275d57750a8cac225fbec to your computer and use it in GitHub Desktop.
Attempts to replicate Parallel.For, but allows the action to be async, Task returning without the async-void problem of doing so with Parallel.For.
using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace Async.ChannelUtilities
{
/// <summary>
/// Like <see cref="Parallel.For"/> but allows the method body to be an <see langword="async"/>, <see cref="Task"/> returning <see cref="Delegate"/>
/// without running into the async-void problem.
/// </summary>
public static class ParallelAsync
{
/// <summary>
/// Executes the supplied function in parallel. Uses all available cores, the default task scheduler, and does not support cancellation.
/// </summary>
/// <param name="fromInclusive">The start index, inclusive.</param>
/// <param name="toExclusive">The end index, exclusive.</param>
/// <param name="body">Function to call in each loop iteration.</param>
/// <returns>Task representing when all the parallel work has completed.</returns>
public static Task For(int fromInclusive, int toExclusive, Func<int, CancellationToken, Task> body)
{
ParallelOptions parallelOptions = new ParallelOptions
{
CancellationToken = default,
MaxDegreeOfParallelism = -1,
TaskScheduler = TaskScheduler.Default
};
return For(fromInclusive, toExclusive, parallelOptions, body);
}
/// <summary>
/// Executes the supplied function in parallel, configuring the work with the supplied options.
/// </summary>
/// <param name="fromInclusive">The start index, inclusive.</param>
/// <param name="toExclusive">The end index, exclusive.</param>
/// <param name="parallelOptions">Configures degree of parallelism, cancellation, and optionally the task scheduler.</param>
/// <param name="body">Function to call in each loop iteration.</param>
/// <returns>Task representing when all the parallel work has completed.</returns>
public static async Task For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Func<int, CancellationToken, Task> body)
{
CancellationToken cancellationToken = parallelOptions.CancellationToken;
cancellationToken.ThrowIfCancellationRequested();
int maxDegreeOfParallelism = parallelOptions.MaxDegreeOfParallelism == -1 ? Environment.ProcessorCount : parallelOptions.MaxDegreeOfParallelism;
TaskScheduler taskScheduler = parallelOptions.TaskScheduler ?? TaskScheduler.Current;
Channel<int> channel = Channel.CreateBounded<int>(new BoundedChannelOptions(maxDegreeOfParallelism)
{
AllowSynchronousContinuations = false,
FullMode = BoundedChannelFullMode.Wait,
SingleReader = maxDegreeOfParallelism == 1,
SingleWriter = true
});
Task[] tasks = new Task[maxDegreeOfParallelism + 1];
tasks[0] = Producer(fromInclusive, toExclusive, channel, cancellationToken);
for (int i = 1; i < tasks.Length; i++)
{
// Is it necessary to use the TaskScheduler? Seems to be parallel already by just adding more Consumers without Task.Factory.StartNew.
//tasks[i] = Consumer(channel, body, cancellationToken);
tasks[i] = Task.Factory.StartNew(async () =>
{
await Consumer(channel, body, cancellationToken).ConfigureAwait(false);
}, cancellationToken, TaskCreationOptions.RunContinuationsAsynchronously, taskScheduler).Unwrap();
}
await Task.WhenAll(tasks).ConfigureAwait(false);
}
private static async Task Producer(int fromInclusive, int toExclusive, Channel<int> channel, CancellationToken cancellationToken)
{
for (int index = fromInclusive; index < toExclusive; index++)
{
Console.WriteLine($"Producer: {DateTime.Now} Thread Id {Thread.CurrentThread.ManagedThreadId} creating task #{index}.");
await channel.Writer.WriteAsync(index, cancellationToken).ConfigureAwait(false);
}
channel.Writer.TryComplete();
}
private static async Task Consumer(Channel<int> channel, Func<int, CancellationToken, Task> body, CancellationToken cancellationToken)
{
await foreach (int index in channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
try
{
await body(index, cancellationToken).ConfigureAwait(false);
Console.WriteLine($" Consumer: {DateTime.Now} Thread Id {Thread.CurrentThread.ManagedThreadId} completed task #{index}.");
}
catch (Exception ex)
{
channel.Writer.TryComplete(ex);
break;
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment