Skip to content

Instantly share code, notes, and snippets.

@n8allan
Created February 13, 2024 03:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save n8allan/c4e0610ea9dab4eb266bfb9bb651d03e to your computer and use it in GitHub Desktop.
Save n8allan/c4e0610ea9dab4eb266bfb9bb651d03e to your computer and use it in GitHub Desktop.
Processes items in a list asynchronously up to n items at a time as an async enumerable.
public static class LimitedWorkers
{
public static async IAsyncEnumerable<TO> EnumerateAsync<TI, TO>(this IEnumerable<TI> items, Func<TI, TO> taskFunc, int maxConcurrency, Action<TI, Exception>? handleError)
{
var semaphore = new SemaphoreSlim(maxConcurrency);
var tasks = new List<Task<TO>>();
var taskItems = new Dictionary<Task<TO>, TI>();
foreach (var item in items)
{
await semaphore.WaitAsync();
var task = Task.Run(() => taskFunc(item)).ContinueWith(t =>
{
semaphore.Release();
return t.Result;
});
tasks.Add(task);
taskItems.Add(task, item);
// Check and yield completed tasks without waiting for all to start
foreach (var completedTask in tasks.Where(t => t.IsCompleted).ToList())
{
TO result;
try
{
result = await completedTask;
}
catch (Exception ex)
{
handleError?.Invoke(item, ex);
continue;
}
tasks.Remove(completedTask);
taskItems.Remove(completedTask);
yield return result;
}
}
// After all tasks have been started, wait for any remaining tasks to complete
while (tasks.Count > 0)
{
var completedTask = await Task.WhenAny(tasks);
TO result;
try
{
result = await completedTask;
}
catch (Exception ex)
{
handleError?.Invoke(taskItems[completedTask], ex);
continue;
}
tasks.Remove(completedTask);
taskItems.Remove(completedTask);
yield return result;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment