Skip to content

Instantly share code, notes, and snippets.

@bricelam
Last active August 29, 2015 14:22
Show Gist options
  • Save bricelam/e9fc0701d9ad817d18d9 to your computer and use it in GitHub Desktop.
Save bricelam/e9fc0701d9ad817d18d9 to your computer and use it in GitHub Desktop.
ForEachAsync
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
class ParallelEx
{
public static Task ForEachAsync<TSource>(
IEnumerable<TSource> source,
Func<TSource, Task> body)
{
var factory = new TaskFactory(
new LimitedConcurrencyLevelTaskScheduler(Environment.ProcessorCount));
return Task.WhenAll(
source.Select(
s => factory.StartNew(() => body(s)).Unwrap()));
}
class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{
[ThreadStatic]
static bool _processingTasks;
readonly LinkedList<Task> _queue = new LinkedList<Task>();
int _concurrencyLevel;
public LimitedConcurrencyLevelTaskScheduler(int maximumConcurrencyLevel)
{
MaximumConcurrencyLevel = maximumConcurrencyLevel;
}
public override int MaximumConcurrencyLevel { get; }
protected override void QueueTask(Task task)
{
lock (_queue)
{
_queue.AddLast(task);
if (_concurrencyLevel < MaximumConcurrencyLevel)
{
_concurrencyLevel++;
ThreadPool.UnsafeQueueUserWorkItem(ProcessTasks, state: null);
}
}
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
if (!_processingTasks)
return false;
if (taskWasPreviouslyQueued && !TryDequeue(task))
return false;
return TryExecuteTask(task);
}
protected override bool TryDequeue(Task task)
{
lock (_queue)
{
return _queue.Remove(task);
}
}
protected override IEnumerable<Task> GetScheduledTasks()
{
var lockTaken = false;
try
{
Monitor.TryEnter(_queue, ref lockTaken);
if (!lockTaken)
{
throw new NotSupportedException();
}
return _queue;
}
finally
{
if (lockTaken)
{
Monitor.Exit(_queue);
}
}
}
void ProcessTasks(object state)
{
_processingTasks = true;
try
{
while (true)
{
Task task;
lock (_queue)
{
if (_queue.Count == 0)
{
_concurrencyLevel--;
break;
}
task = _queue.First.Value;
_queue.RemoveFirst();
}
TryExecuteTask(task);
}
}
finally
{
_processingTasks = false;
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment