Last active
August 29, 2015 14:22
-
-
Save bricelam/e9fc0701d9ad817d18d9 to your computer and use it in GitHub Desktop.
ForEachAsync
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Threading; | |
using System.Threading.Tasks; | |
class ParallelEx | |
{ | |
public static Task ForEachAsync<TSource>( | |
IEnumerable<TSource> source, | |
Func<TSource, Task> body) | |
{ | |
var factory = new TaskFactory( | |
new LimitedConcurrencyLevelTaskScheduler(Environment.ProcessorCount)); | |
return Task.WhenAll( | |
source.Select( | |
s => factory.StartNew(() => body(s)).Unwrap())); | |
} | |
class LimitedConcurrencyLevelTaskScheduler : TaskScheduler | |
{ | |
[ThreadStatic] | |
static bool _processingTasks; | |
readonly LinkedList<Task> _queue = new LinkedList<Task>(); | |
int _concurrencyLevel; | |
public LimitedConcurrencyLevelTaskScheduler(int maximumConcurrencyLevel) | |
{ | |
MaximumConcurrencyLevel = maximumConcurrencyLevel; | |
} | |
public override int MaximumConcurrencyLevel { get; } | |
protected override void QueueTask(Task task) | |
{ | |
lock (_queue) | |
{ | |
_queue.AddLast(task); | |
if (_concurrencyLevel < MaximumConcurrencyLevel) | |
{ | |
_concurrencyLevel++; | |
ThreadPool.UnsafeQueueUserWorkItem(ProcessTasks, state: null); | |
} | |
} | |
} | |
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) | |
{ | |
if (!_processingTasks) | |
return false; | |
if (taskWasPreviouslyQueued && !TryDequeue(task)) | |
return false; | |
return TryExecuteTask(task); | |
} | |
protected override bool TryDequeue(Task task) | |
{ | |
lock (_queue) | |
{ | |
return _queue.Remove(task); | |
} | |
} | |
protected override IEnumerable<Task> GetScheduledTasks() | |
{ | |
var lockTaken = false; | |
try | |
{ | |
Monitor.TryEnter(_queue, ref lockTaken); | |
if (!lockTaken) | |
{ | |
throw new NotSupportedException(); | |
} | |
return _queue; | |
} | |
finally | |
{ | |
if (lockTaken) | |
{ | |
Monitor.Exit(_queue); | |
} | |
} | |
} | |
void ProcessTasks(object state) | |
{ | |
_processingTasks = true; | |
try | |
{ | |
while (true) | |
{ | |
Task task; | |
lock (_queue) | |
{ | |
if (_queue.Count == 0) | |
{ | |
_concurrencyLevel--; | |
break; | |
} | |
task = _queue.First.Value; | |
_queue.RemoveFirst(); | |
} | |
TryExecuteTask(task); | |
} | |
} | |
finally | |
{ | |
_processingTasks = false; | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment