Skip to content

Instantly share code, notes, and snippets.

@isc30
Last active Sep 24, 2020
Embed
What would you like to do?
SelectParallel
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Test
{
class Program
{
static Random rnd = new Random();
static async Task Main(string[] args)
{
Log("Starting");
var source = GetIds();
var maps = source.SelectParallel(e => Map(e));
await foreach (var str in maps)
{
Log(str);
}
Log("Ending");
}
static void Log(string str)
{
Console.WriteLine($"[{DateTime.Now.ToLongTimeString()}] {str}");
}
static async IAsyncEnumerable<int> GetIds()
{
foreach (var i in Enumerable.Range(1, 20))
{
await Task.Delay(1);
yield return i;
}
}
static async Task<string> Map(int id)
{
await Task.Delay(rnd.Next(1000, 2000));
return $"{id}_{Thread.CurrentThread.ManagedThreadId}";
}
}
public static class AsyncEnumerableExtensions
{
public static async IAsyncEnumerable<T> AsAsyncEnumerable<T>(this IEnumerable<T> source)
{
foreach (var e in source)
{
yield return e;
}
}
/// <summary>
/// Runs the selectors in parallel and yields in completion order
/// </summary>
public static IAsyncEnumerable<TOut> SelectParallel<TIn, TOut>(
this IEnumerable<TIn> source,
Func<TIn, TOut> selector)
{
return SelectParallel(source, e => Task.Run(() => selector(e)));
}
/// <summary>
/// Runs the selectors in parallel and yields in completion order
/// </summary>
public static async IAsyncEnumerable<TOut> SelectParallel<TIn, TOut>(
this IEnumerable<TIn> source,
Func<TIn, Task<TOut>> selector)
{
if (source == null)
{
throw new InvalidOperationException("Source is null");
}
var tasks = source
.Select(selector)
.ToHashSet();
while (tasks.Any())
{
var completedTask = await Task.WhenAny<TOut>(tasks);
tasks.Remove(completedTask);
yield return completedTask.Result;
}
}
/// <summary>
/// Runs the selectors in parallel and yields in completion order
/// </summary>
public static IAsyncEnumerable<TOut> SelectParallel<TIn, TOut>(
this IAsyncEnumerable<TIn> source,
Func<TIn, TOut> selector)
{
return SelectParallel(source, e => Task.Run(() => selector(e)));
}
/// <summary>
/// Runs the selectors in parallel and yields in completion order
/// </summary>
public static async IAsyncEnumerable<TOut> SelectParallel<TIn, TOut>(
this IAsyncEnumerable<TIn> source,
Func<TIn, Task<TOut>> selector)
{
if (source == null)
{
throw new InvalidOperationException("Source is null");
}
var enumerator = source.GetAsyncEnumerator();
var sourceFinished = false;
var tasks = new HashSet<Task<TOut>>();
Task<bool> sourceMoveTask = null;
Task<Task<TOut>> pipeCompletionTask = null;
try
{
while (!sourceFinished || tasks.Any())
{
if (sourceMoveTask == null && !sourceFinished)
{
sourceMoveTask = enumerator.MoveNextAsync().AsTask();
}
if (pipeCompletionTask == null && tasks.Any())
{
pipeCompletionTask = Task.WhenAny<TOut>(tasks);
}
var coreTasks = new Task[] { pipeCompletionTask, sourceMoveTask }
.Where(t => t != null)
.ToList();
if (!coreTasks.Any())
{
break;
}
await Task.WhenAny(coreTasks);
if (sourceMoveTask != null && sourceMoveTask.IsCompleted)
{
sourceFinished = !sourceMoveTask.Result;
if (!sourceFinished)
{
try
{
tasks.Add(selector(enumerator.Current));
}
catch { }
}
sourceMoveTask = null;
}
if (pipeCompletionTask != null && pipeCompletionTask.IsCompleted)
{
var completedTask = pipeCompletionTask.Result;
if (completedTask.IsCompletedSuccessfully)
{
yield return completedTask.Result;
}
tasks.Remove(completedTask);
pipeCompletionTask = null;
}
}
}
finally
{
await enumerator.DisposeAsync();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment