Method that implement parellelism
public static class TaskParallelLibrary
{
public static async Task<IEnumerable<TResult>[]> ProcessListWithParallelismAsync<TSource, TResult>(
this IEnumerable<TSource> source,
Func<TSource, Task<IEnumerable<TResult>>> workItem,
int maxDegreeOfParallelism)
{
ConcurrentQueue<Func<Task<IEnumerable<TResult>>>> taskQueue = new ConcurrentQueue<Func<Task<IEnumerable<TResult>>>>
(source.Select<TSource, Func<Task<IEnumerable<TResult>>>>(element => () => workItem(element)));
Task<IEnumerable<TResult>>[] processors = new Task<IEnumerable<TResult>>[maxDegreeOfParallelism];
for (int i = 0; i < maxDegreeOfParallelism; i++)
{
processors[i] = RunTasksOnSingleProcessorAsync(taskQueue);
}
return await Task.WhenAll(processors).ConfigureAwait(false);
}
private static async Task<IEnumerable<TResult>> RunTasksOnSingleProcessorAsync<TResult>(
ConcurrentQueue<Func<Task<IEnumerable<TResult>>>> queue)
{
List<TResult> result = new List<TResult>();
while (!queue.IsEmpty)
{
if (queue.TryDequeue(out Func<Task<IEnumerable<TResult>>> workItem))
{
result.AddRange(await workItem().ConfigureAwait(false));
}
}
return result;
}
}
Call GetStringsFromInt
with parallelism
List<int> ints = new List<int> { 1, 3, 5, 7, 9 };
IEnumerable<string> result = (await ints.ProcessListWithParallelismAsync(i => GetStringsFromInt(i), 2).ConfigureAwait(false))
.SelectMany(_ => _);
private async Task<IEnumerable<string>> GetStringsFromInt(int i)
{
return await Task.FromResult(new List<string> { $"{i}", $"{i + 1}" }).ConfigureAwait(false);
}