Last active
February 22, 2020 17:07
-
-
Save AdamDotNet/e0ee211d052275d57750a8cac225fbec to your computer and use it in GitHub Desktop.
Attempts to replicate Parallel.For, but allows the action to be async, Task returning without the async-void problem of doing so with Parallel.For.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Threading; | |
using System.Threading.Channels; | |
using System.Threading.Tasks; | |
namespace Async.ChannelUtilities | |
{ | |
/// <summary> | |
/// Like <see cref="Parallel.For"/> but allows the method body to be an <see langword="async"/>, <see cref="Task"/> returning <see cref="Delegate"/> | |
/// without running into the async-void problem. | |
/// </summary> | |
public static class ParallelAsync | |
{ | |
/// <summary> | |
/// Executes the supplied function in parallel. Uses all available cores, the default task scheduler, and does not support cancellation. | |
/// </summary> | |
/// <param name="fromInclusive">The start index, inclusive.</param> | |
/// <param name="toExclusive">The end index, exclusive.</param> | |
/// <param name="body">Function to call in each loop iteration.</param> | |
/// <returns>Task representing when all the parallel work has completed.</returns> | |
public static Task For(int fromInclusive, int toExclusive, Func<int, CancellationToken, Task> body) | |
{ | |
ParallelOptions parallelOptions = new ParallelOptions | |
{ | |
CancellationToken = default, | |
MaxDegreeOfParallelism = -1, | |
TaskScheduler = TaskScheduler.Default | |
}; | |
return For(fromInclusive, toExclusive, parallelOptions, body); | |
} | |
/// <summary> | |
/// Executes the supplied function in parallel, configuring the work with the supplied options. | |
/// </summary> | |
/// <param name="fromInclusive">The start index, inclusive.</param> | |
/// <param name="toExclusive">The end index, exclusive.</param> | |
/// <param name="parallelOptions">Configures degree of parallelism, cancellation, and optionally the task scheduler.</param> | |
/// <param name="body">Function to call in each loop iteration.</param> | |
/// <returns>Task representing when all the parallel work has completed.</returns> | |
public static async Task For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Func<int, CancellationToken, Task> body) | |
{ | |
CancellationToken cancellationToken = parallelOptions.CancellationToken; | |
cancellationToken.ThrowIfCancellationRequested(); | |
int maxDegreeOfParallelism = parallelOptions.MaxDegreeOfParallelism == -1 ? Environment.ProcessorCount : parallelOptions.MaxDegreeOfParallelism; | |
TaskScheduler taskScheduler = parallelOptions.TaskScheduler ?? TaskScheduler.Current; | |
Channel<int> channel = Channel.CreateBounded<int>(new BoundedChannelOptions(maxDegreeOfParallelism) | |
{ | |
AllowSynchronousContinuations = false, | |
FullMode = BoundedChannelFullMode.Wait, | |
SingleReader = maxDegreeOfParallelism == 1, | |
SingleWriter = true | |
}); | |
Task[] tasks = new Task[maxDegreeOfParallelism + 1]; | |
tasks[0] = Producer(fromInclusive, toExclusive, channel, cancellationToken); | |
for (int i = 1; i < tasks.Length; i++) | |
{ | |
// Is it necessary to use the TaskScheduler? Seems to be parallel already by just adding more Consumers without Task.Factory.StartNew. | |
//tasks[i] = Consumer(channel, body, cancellationToken); | |
tasks[i] = Task.Factory.StartNew(async () => | |
{ | |
await Consumer(channel, body, cancellationToken).ConfigureAwait(false); | |
}, cancellationToken, TaskCreationOptions.RunContinuationsAsynchronously, taskScheduler).Unwrap(); | |
} | |
await Task.WhenAll(tasks).ConfigureAwait(false); | |
} | |
private static async Task Producer(int fromInclusive, int toExclusive, Channel<int> channel, CancellationToken cancellationToken) | |
{ | |
for (int index = fromInclusive; index < toExclusive; index++) | |
{ | |
Console.WriteLine($"Producer: {DateTime.Now} Thread Id {Thread.CurrentThread.ManagedThreadId} creating task #{index}."); | |
await channel.Writer.WriteAsync(index, cancellationToken).ConfigureAwait(false); | |
} | |
channel.Writer.TryComplete(); | |
} | |
private static async Task Consumer(Channel<int> channel, Func<int, CancellationToken, Task> body, CancellationToken cancellationToken) | |
{ | |
await foreach (int index in channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false)) | |
{ | |
try | |
{ | |
await body(index, cancellationToken).ConfigureAwait(false); | |
Console.WriteLine($" Consumer: {DateTime.Now} Thread Id {Thread.CurrentThread.ManagedThreadId} completed task #{index}."); | |
} | |
catch (Exception ex) | |
{ | |
channel.Writer.TryComplete(ex); | |
break; | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment