Skip to content

Instantly share code, notes, and snippets.

@tahq69
Created November 26, 2020 16:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tahq69/55c24331de1094a0dcd94782120e65ac to your computer and use it in GitHub Desktop.
Save tahq69/55c24331de1094a0dcd94782120e65ac to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
/// <summary>
/// Utility functions for working with parallel workloads.
/// </summary>
public static class ParallelUtils
{
/// <summary>
/// Variation of Parallel.ForEach() with async support. See <a
/// href="http://blogs.msdn.com/b/pfxteam/archive/2012/03/05/10278165.aspx">article
/// by Stephen Toub</a>.
/// </summary>
/// <typeparam name="T">The type of item in collection.</typeparam>
/// <param name="source">The collection of items to iterate over.</param>
/// <param name="dop">The maximum degree of parallelism.</param>
/// <param name="body">The function to perform on each item.</param>
/// <returns>Asynchronous operation <see cref="Task"/>.</returns>
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
=> Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async () =>
{
using (partition)
{
while (partition.MoveNext())
await body(partition.Current);
}
}));
/// <summary>
/// Variation of Parallel.ForEach() with asynchronous support and
/// iterator parameter.
/// </summary>
/// <typeparam name="TElement">
/// The type of item in collection.
/// </typeparam>
/// <typeparam name="TParam">The type of the parameter.</typeparam>
/// <param name="source">The collection of items to iterate over.</param>
/// <param name="dop">The maximum degree of parallelism.</param>
/// <param name="body">The function to perform on each item.</param>
/// <param name="param">The parameter to pass in a body.</param>
/// <returns>Asynchronous operation <see cref="Task"/>.</returns>
public static Task ForEachAsync<TElement, TParam>(this IEnumerable<TElement> source, int dop, Func<TParam, TElement, Task> body, TParam param)
=> Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async () =>
{
using (partition)
{
while (partition.MoveNext())
await body(param, partition.Current);
}
}));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment