Skip to content

Instantly share code, notes, and snippets.

@smithkl42
Last active April 19, 2016 14:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save smithkl42/ca5b98e909952eddef1e3a076c0dd91d to your computer and use it in GitHub Desktop.
Save smithkl42/ca5b98e909952eddef1e3a076c0dd91d to your computer and use it in GitHub Desktop.
Parallel helpers
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using MoreLinq;
namespace Payboard.Common
{
public static class ParallelHelpers
{
/// <summary>
/// Iterate asynchronously in parallel over a data source, but only with a given degree of parallelization, so that we
/// don't DOS ourselves.
/// </summary>
public static async Task ForEachParallel<T>(this IEnumerable<T> list, Func<T, Task> action,
int maxParallelization = 100)
{
// Create the execution block
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxParallelization
};
var sendMessageBlock = new ActionBlock<T>(async item => { await action(item); }, options);
// Send everything to the execution block and wait for it to finish
list.ForEach(item => sendMessageBlock.Post(item));
sendMessageBlock.Complete();
await sendMessageBlock.Completion;
}
/// <summary>
/// Select asynchronously parallel from a data source, but only with a given degree of parallelization, so that we
/// don't DOS ourselves.
/// </summary>
public static async Task<List<TResult>> SelectParallel<TSource, TResult>(this IEnumerable<TSource> list,
Func<TSource, Task<TResult>> mapFunc,
int maxParallelization = 100)
{
// Create the execution block
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxParallelization
};
var results = new List<TResult>();
var sendMessageBlock = new ActionBlock<TSource>(async item =>
{
var result = await mapFunc(item);
lock (results)
{
results.Add(result);
}
}, options);
// Send everything to the execution block and wait for it to finish
list.ForEach(cu => sendMessageBlock.Post(cu));
sendMessageBlock.Complete();
await sendMessageBlock.Completion;
return results;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment