Skip to content

Instantly share code, notes, and snippets.

@maslevx
Last active August 29, 2015 14:23
Show Gist options
  • Save maslevx/fbf1549ef8dfe54be564 to your computer and use it in GitHub Desktop.
Save maslevx/fbf1549ef8dfe54be564 to your computer and use it in GitHub Desktop.
linqs
/// <summary>
/// Provides extension methods for executing sequences of async operations.
/// Items flow into the resulting sequence as they complete, with the specified limit of operations executing concurrently
/// </summary>
public static class AsyncOpLinqExtension
{
/// <summary>
/// executes <paramref name="action"/> over each element of <paramref name="source"/>
/// </summary>
/// <typeparam name="TSource">type of input sequence</typeparam>
/// <typeparam name="TResult">type of result returned by <paramref name="action"/></typeparam>
/// <param name="source">sequence to process</param>
/// <param name="action">operation to perform</param>
/// <param name="limit">maximum tasks to be run at once</param>
/// <returns>resulting sequence of <typeparamref name="TResult"/></returns>
public static IEnumerable<TResult> DoTask<TSource, TResult>(this IEnumerable<TSource> source,
Func<TSource, Task<TResult>> action, int limit)
{
return source.Select(item => action(item)).RunTasks(limit);
}
/// <summary>
/// executes <paramref name="action"/> as a task over each element of <paramref name="source"/>
/// </summary>
/// <typeparam name="TSource">type of input sequence</typeparam>
/// <typeparam name="TResult">type of result returned by <paramref name="action"/></typeparam>
/// <param name="source">sequence to process</param>
/// <param name="action">operation to perform</param>
/// <param name="limit">maximum tasks to run at once</param>
/// <returns>resulting sequence of <typeparamref name="TResult"/></returns>
public static IEnumerable<TResult> DoTask<TSource, TResult>(this IEnumerable<TSource> source,
Func<TSource, TResult> action, int limit)
{
return source.Select(item => new Task<TResult>(() => action(item))).RunTasks(limit);
}
/// <summary>
/// executes a sequence of tasks
/// </summary>
/// <typeparam name="T">return type of task</typeparam>
/// <param name="source">input sequence to be executed</param>
/// <param name="limit">maximum tasks to run at once</param>
/// <returns>result of completed task</returns>
public static IEnumerable<T> RunTasks<T>(this IEnumerable<Task<T>> source, int limit)
{
TasksHandler<T> taskhandler = new TasksHandler<T>(limit);
return taskhandler.Do(source);
}
//public static IEnumerable<Task> RunTasks(this IEnumerable<Task> source, int limit)
//{
//}
#region task handler
class TasksHandler<T>
{
private Task<T>[] m_buffer;
private readonly object m_lock = new object();
private readonly ConcurrentQueue<T> m_completed = new ConcurrentQueue<T>();
private IEnumerator<Task<T>> m_mover;
private bool m_waiting;
public int TaskLimit { get; set; }
public TasksHandler(int limit)
{
TaskLimit = limit;
}
public TasksHandler()
: this(50)
{ }
public IEnumerable<T> Do(IEnumerable<Task<T>> source)
{
int limit = TaskLimit;
m_buffer = new Task<T>[limit];
m_mover = source.GetEnumerator();
while (--limit >= 0 && m_mover.MoveNext())
{
Task<T> current = m_mover.Current;
m_buffer[limit] = current;
current.ContinueWith(OnCompletion);
TryStart(current);
}
while (m_completed.IsEmpty == false || m_buffer.Any(t => t != null))
{
if (m_completed.IsEmpty)
lock (m_completed)
{
m_waiting = true;
Monitor.Wait(m_completed);
m_waiting = false;
}
else
{
T tmp;
if (m_completed.TryDequeue(out tmp))
yield return tmp;
}
}
}
private void OnCompletion(Task<T> task)
{
m_completed.Enqueue(task.Result);
if (m_waiting)
lock (m_completed)
{
Monitor.Pulse(m_completed);
}
lock (m_lock)
{
int pos = Array.IndexOf(m_buffer, task);
m_buffer[pos] = null;
if (m_mover.MoveNext())
{
Task<T> current = m_mover.Current;
m_buffer[pos] = current;
current.ContinueWith(OnCompletion);
TryStart(current);
}
}
}
private static void TryStart(Task obj)
{
if (obj.Status == TaskStatus.Created)
obj.Start();
}
}
#endregion
}
public static class ConcurrentLinqExtension
{
private static readonly int DefaultThreadLimit = Environment.ProcessorCount;
/// <summary>
/// Execute action over each element in paralell, returning them in order of <b>completion</b>.
/// Does not maintain order of input
/// </summary>
/// <param name="source">sequence to act on</param>
/// <param name="threadLimit">Max degree of parallelism</param>
/// <param name="action">The action to perform on each item</param>
/// <returns>items as they complete</returns>
public static IEnumerable<T> ConcurrentForEach<T>(this IEnumerable<T> source, int threadLimit, Action<T> action)
{
Guard.AgainstNull(source, "source");
Guard.AgainstNull(action, "action");
Func<T, T> wrapper = (item) =>
{
action(item);
return item;
};
return Execute(source, threadLimit, wrapper);
}
/// <summary>
/// Execute action over each element in paralell, returning them in order of <b>completion</b>.
/// Does not maintain order of input
/// </summary>
/// <param name="source">sequence to act against</param>
/// <param name="action">The action to perform on each item</param>
/// <returns>items as they complete</returns>
public static IEnumerable<T> ConcurrentForEach<T>(this IEnumerable<T> source, Action<T> action)
{
return ConcurrentForEach(source, Environment.ProcessorCount, action); //default one thread per core
}
/// <summary>
/// Project each element of a sequence into a new form, running in paralell. Resulting items are returned in order of <b>completion</b>.
/// Does not maintain order of input
/// </summary>
/// <typeparam name="TSource">type of input sequence</typeparam>
/// <typeparam name="TResult">new type being returned</typeparam>
/// <param name="source">sequence to select from</param>
/// <param name="threadLimit">Max degree of parallelism</param>
/// <param name="selector">transform function to apply to each element</param>
/// <returns>sequence of the selected type</returns>
public static IEnumerable<TResult> ConcurrentSelect<TSource, TResult>(this IEnumerable<TSource> source,
int threadLimit, Func<TSource, TResult> selector)
{
Guard.AgainstNull(source, "source");
Guard.AgainstNull(selector, "selector");
return Execute(source, threadLimit, selector);
}
/// <summary>
/// Project each element of a sequence into a new form, running in paralell. Resulting items are returned in order of <b>completion</b>.
/// Does not maintain order of input
/// </summary>
/// <typeparam name="TSource">type of input sequence</typeparam>
/// <typeparam name="TResult">new type being returned</typeparam>
/// <param name="source">sequence to select from</param>
/// <param name="selector">transform function to apply to each element</param>
/// <returns>sequence of the selected type</returns>
public static IEnumerable<TResult> ConcurrentSelect<TSource, TResult>(this IEnumerable<TSource> source,
Func<TSource, TResult> selector)
{
return ConcurrentSelect(source, DefaultThreadLimit, selector);
}
private static IEnumerable<TResult> Execute<TSource, TResult>(this IEnumerable<TSource> source,
int threadLimit, Func<TSource, TResult> selector)
{
if (threadLimit <= 0)
threadLimit = DefaultThreadLimit;
ConcurrentQueue<TResult> completed = new ConcurrentQueue<TResult>();
Task executionTask = new Task(() =>
Parallel.ForEach(source, new ParallelOptions { MaxDegreeOfParallelism = threadLimit }, (item) =>
{
TResult res = selector(item);
completed.Enqueue(res);
})
);
executionTask.Start();
while (executionTask.IsCompleted == false || completed.IsEmpty == false)
{
TResult tmp;
if (completed.TryDequeue(out tmp))
yield return tmp;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment