Last active
August 29, 2015 14:23
-
-
Save maslevx/fbf1549ef8dfe54be564 to your computer and use it in GitHub Desktop.
linqs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// <summary> | |
/// Provides extension methods for executing sequences of async operations. | |
/// Items flow into the resulting sequence as they complete, with the specified limit of operations executing concurrently | |
/// </summary> | |
public static class AsyncOpLinqExtension | |
{ | |
/// <summary> | |
/// executes <paramref name="action"/> over each element of <paramref name="source"/> | |
/// </summary> | |
/// <typeparam name="TSource">type of input sequence</typeparam> | |
/// <typeparam name="TResult">type of result returned by <paramref name="action"/></typeparam> | |
/// <param name="source">sequence to process</param> | |
/// <param name="action">operation to perform</param> | |
/// <param name="limit">maximum tasks to be run at once</param> | |
/// <returns>resulting sequence of <typeparamref name="TResult"/></returns> | |
public static IEnumerable<TResult> DoTask<TSource, TResult>(this IEnumerable<TSource> source, | |
Func<TSource, Task<TResult>> action, int limit) | |
{ | |
return source.Select(item => action(item)).RunTasks(limit); | |
} | |
/// <summary> | |
/// executes <paramref name="action"/> as a task over each element of <paramref name="source"/> | |
/// </summary> | |
/// <typeparam name="TSource">type of input sequence</typeparam> | |
/// <typeparam name="TResult">type of result returned by <paramref name="action"/></typeparam> | |
/// <param name="source">sequence to process</param> | |
/// <param name="action">operation to perform</param> | |
/// <param name="limit">maximum tasks to run at once</param> | |
/// <returns>resulting sequence of <typeparamref name="TResult"/></returns> | |
public static IEnumerable<TResult> DoTask<TSource, TResult>(this IEnumerable<TSource> source, | |
Func<TSource, TResult> action, int limit) | |
{ | |
return source.Select(item => new Task<TResult>(() => action(item))).RunTasks(limit); | |
} | |
/// <summary> | |
/// executes a sequence of tasks | |
/// </summary> | |
/// <typeparam name="T">return type of task</typeparam> | |
/// <param name="source">input sequence to be executed</param> | |
/// <param name="limit">maximum tasks to run at once</param> | |
/// <returns>result of completed task</returns> | |
public static IEnumerable<T> RunTasks<T>(this IEnumerable<Task<T>> source, int limit) | |
{ | |
TasksHandler<T> taskhandler = new TasksHandler<T>(limit); | |
return taskhandler.Do(source); | |
} | |
//public static IEnumerable<Task> RunTasks(this IEnumerable<Task> source, int limit) | |
//{ | |
//} | |
#region task handler | |
class TasksHandler<T> | |
{ | |
private Task<T>[] m_buffer; | |
private readonly object m_lock = new object(); | |
private readonly ConcurrentQueue<T> m_completed = new ConcurrentQueue<T>(); | |
private IEnumerator<Task<T>> m_mover; | |
private bool m_waiting; | |
public int TaskLimit { get; set; } | |
public TasksHandler(int limit) | |
{ | |
TaskLimit = limit; | |
} | |
public TasksHandler() | |
: this(50) | |
{ } | |
public IEnumerable<T> Do(IEnumerable<Task<T>> source) | |
{ | |
int limit = TaskLimit; | |
m_buffer = new Task<T>[limit]; | |
m_mover = source.GetEnumerator(); | |
while (--limit >= 0 && m_mover.MoveNext()) | |
{ | |
Task<T> current = m_mover.Current; | |
m_buffer[limit] = current; | |
current.ContinueWith(OnCompletion); | |
TryStart(current); | |
} | |
while (m_completed.IsEmpty == false || m_buffer.Any(t => t != null)) | |
{ | |
if (m_completed.IsEmpty) | |
lock (m_completed) | |
{ | |
m_waiting = true; | |
Monitor.Wait(m_completed); | |
m_waiting = false; | |
} | |
else | |
{ | |
T tmp; | |
if (m_completed.TryDequeue(out tmp)) | |
yield return tmp; | |
} | |
} | |
} | |
private void OnCompletion(Task<T> task) | |
{ | |
m_completed.Enqueue(task.Result); | |
if (m_waiting) | |
lock (m_completed) | |
{ | |
Monitor.Pulse(m_completed); | |
} | |
lock (m_lock) | |
{ | |
int pos = Array.IndexOf(m_buffer, task); | |
m_buffer[pos] = null; | |
if (m_mover.MoveNext()) | |
{ | |
Task<T> current = m_mover.Current; | |
m_buffer[pos] = current; | |
current.ContinueWith(OnCompletion); | |
TryStart(current); | |
} | |
} | |
} | |
private static void TryStart(Task obj) | |
{ | |
if (obj.Status == TaskStatus.Created) | |
obj.Start(); | |
} | |
} | |
#endregion | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static class ConcurrentLinqExtension | |
{ | |
private static readonly int DefaultThreadLimit = Environment.ProcessorCount; | |
/// <summary> | |
/// Execute action over each element in paralell, returning them in order of <b>completion</b>. | |
/// Does not maintain order of input | |
/// </summary> | |
/// <param name="source">sequence to act on</param> | |
/// <param name="threadLimit">Max degree of parallelism</param> | |
/// <param name="action">The action to perform on each item</param> | |
/// <returns>items as they complete</returns> | |
public static IEnumerable<T> ConcurrentForEach<T>(this IEnumerable<T> source, int threadLimit, Action<T> action) | |
{ | |
Guard.AgainstNull(source, "source"); | |
Guard.AgainstNull(action, "action"); | |
Func<T, T> wrapper = (item) => | |
{ | |
action(item); | |
return item; | |
}; | |
return Execute(source, threadLimit, wrapper); | |
} | |
/// <summary> | |
/// Execute action over each element in paralell, returning them in order of <b>completion</b>. | |
/// Does not maintain order of input | |
/// </summary> | |
/// <param name="source">sequence to act against</param> | |
/// <param name="action">The action to perform on each item</param> | |
/// <returns>items as they complete</returns> | |
public static IEnumerable<T> ConcurrentForEach<T>(this IEnumerable<T> source, Action<T> action) | |
{ | |
return ConcurrentForEach(source, Environment.ProcessorCount, action); //default one thread per core | |
} | |
/// <summary> | |
/// Project each element of a sequence into a new form, running in paralell. Resulting items are returned in order of <b>completion</b>. | |
/// Does not maintain order of input | |
/// </summary> | |
/// <typeparam name="TSource">type of input sequence</typeparam> | |
/// <typeparam name="TResult">new type being returned</typeparam> | |
/// <param name="source">sequence to select from</param> | |
/// <param name="threadLimit">Max degree of parallelism</param> | |
/// <param name="selector">transform function to apply to each element</param> | |
/// <returns>sequence of the selected type</returns> | |
public static IEnumerable<TResult> ConcurrentSelect<TSource, TResult>(this IEnumerable<TSource> source, | |
int threadLimit, Func<TSource, TResult> selector) | |
{ | |
Guard.AgainstNull(source, "source"); | |
Guard.AgainstNull(selector, "selector"); | |
return Execute(source, threadLimit, selector); | |
} | |
/// <summary> | |
/// Project each element of a sequence into a new form, running in paralell. Resulting items are returned in order of <b>completion</b>. | |
/// Does not maintain order of input | |
/// </summary> | |
/// <typeparam name="TSource">type of input sequence</typeparam> | |
/// <typeparam name="TResult">new type being returned</typeparam> | |
/// <param name="source">sequence to select from</param> | |
/// <param name="selector">transform function to apply to each element</param> | |
/// <returns>sequence of the selected type</returns> | |
public static IEnumerable<TResult> ConcurrentSelect<TSource, TResult>(this IEnumerable<TSource> source, | |
Func<TSource, TResult> selector) | |
{ | |
return ConcurrentSelect(source, DefaultThreadLimit, selector); | |
} | |
private static IEnumerable<TResult> Execute<TSource, TResult>(this IEnumerable<TSource> source, | |
int threadLimit, Func<TSource, TResult> selector) | |
{ | |
if (threadLimit <= 0) | |
threadLimit = DefaultThreadLimit; | |
ConcurrentQueue<TResult> completed = new ConcurrentQueue<TResult>(); | |
Task executionTask = new Task(() => | |
Parallel.ForEach(source, new ParallelOptions { MaxDegreeOfParallelism = threadLimit }, (item) => | |
{ | |
TResult res = selector(item); | |
completed.Enqueue(res); | |
}) | |
); | |
executionTask.Start(); | |
while (executionTask.IsCompleted == false || completed.IsEmpty == false) | |
{ | |
TResult tmp; | |
if (completed.TryDequeue(out tmp)) | |
yield return tmp; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment