Skip to content

Instantly share code, notes, and snippets.

@dmarlow
Created February 2, 2015 21:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dmarlow/dce4870dd6139c6fd075 to your computer and use it in GitHub Desktop.
Save dmarlow/dce4870dd6139c6fd075 to your computer and use it in GitHub Desktop.
Concurrent Executor
using System;
using System.Collections.Generic;
using System.Dynamic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace Degreed.Processing
{
/// <summary>
/// Concurrently executes actions over the given collection until results are satisfied.
/// </summary>
public class ConcurrentExecutor
{
/// <summary>
/// Execute the specified action over the items and if no error/exception
/// is thrown within the action, it will return the specified number of results requested.
/// Error check and completed hooks are there, with specific context, for
/// logging/notification handling.
/// </summary>
/// <typeparam name="T">The input collection type.</typeparam>
/// <typeparam name="U">The result collection type.</typeparam>
/// <param name="resultsDesired">Number of items desired from the collection. Minimum of 1 required.</param>
/// <param name="items">The items to concurrently execute the action on.</param>
/// <param name="action">The action to execute on each item. This func takes the item, a dynamic expando-object
/// that can be used to store information (time execution started for calculating execution time, etc.)</param>
/// <param name="errorCheck">When an error is encountered, this action is invoked with the
/// item being executed when the error occurred, any output object (probably null or default of type U),
/// the exception object and the context expando-object.</param>
/// <param name="completed">Action invoked when the processing completes regardless of any error.
/// Action is given the item being actioned, any result and the context.</param>
/// <returns></returns>
public async Task<IEnumerable<U>> Execute<T, U>(int resultsDesired, IEnumerable<T> items,
Func<T, dynamic, Task<U>> action,
Action<T, U, Exception, dynamic> errorCheck = null,
Action<T, U, dynamic> completed = null)
{
if (items.IsNullOrEmpty())
return Enumerable.Empty<U>();
if (action == null)
throw new ArgumentNullException("action", "Action required for execution");
// Used for returning after the number of results has been reached.
var waitEvt = new AsyncAutoResetEvent();
// Place results in this buffer
var resultBlock = new BufferBlock<U>();
// Place items to be acted on in this buffer
var processBlock = new BufferBlock<T>();
// Place all items that haven't been processed in this buffer.
// We'll pluck them from here and put them into the process block to
// be processed.
var inputBlock = new BufferBlock<T>();
// Enumerate the IEnumerable so we don't re-enumerate it more than once.
var itemsEnumerated = items as IList<T> ?? items.ToList();
// Adjust the # of results desired. Must have at least 1 result.
resultsDesired = Math.Max(1, Math.Min(resultsDesired, itemsEnumerated.Count));
// Post remaining in the order given. Skip the # of results desired because we're going to
// parallelize the execution over the # of results desired. If there are failures
// then take an item from the input block and place it in the process block.
itemsEnumerated.Skip(resultsDesired).ToList().ForEach(x => inputBlock.Post(x));
// Keep track of how many items have been executed.
int itemsExecuted = 0;
// Wrap the action given with our execution action that contains a context,
// and hooks for error/completed actions. Upon error handling, transfer one item
// from the input block to the process block.
var executeAction = new Action<T>(async t =>
{
// Dynamic context for this execution call
dynamic v = new ExpandoObject();
// Default result
U actionResult = default(U);
try
{
// Perform action specified
actionResult = await action(t, v);
// Got here, no error happened in the action. Store it in the result buffer block.
resultBlock.Post(actionResult);
}
catch (Exception e)
{
if (errorCheck != null)
errorCheck(t, actionResult, e, v);
// If we haven't reached our desired #, place another in the
// process buffer.
if (resultBlock.Count < resultsDesired)
{
T tToProcess;
if (inputBlock.TryReceive(out tToProcess))
{
processBlock.Post(tToProcess);
}
}
}
finally
{
itemsExecuted++;
if (completed != null)
completed(t, actionResult, v);
// We're done if we've reached the number we've been looking for
if (resultBlock.Count == resultsDesired || itemsExecuted == itemsEnumerated.Count)
{
waitEvt.Set();
}
}
});
// Handles processing any items due to failures
var actionBlock = new ActionBlock<T>(executeAction,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = resultsDesired,
MaxMessagesPerTask = DataflowBlockOptions.Unbounded
});
// Anything that goes into the process block will be executed
processBlock.LinkTo(actionBlock);
// Perform n concurrently to start. If there are errors, run more concurrently.
itemsEnumerated.Take(resultsDesired).AsParallel().ForAll(executeAction);
// Wait here asynchronously until we've completed # desired or end reached.
await waitEvt.WaitAsync();
// Done. Get everything in the results block. Can be null..
IList<U> results;
resultBlock.TryReceiveAll(out results);
// Return an empty if null found.
return results ?? Enumerable.Empty<U>();
}
public async Task Example()
{
var rand = new Random();
var executor = new ConcurrentExecutor();
var execTask = executor.Execute<int, int>(3, Enumerable.Range(0, 10),
// Action
async (i, context) =>
{
// Store when it started
context.StartTime = DateTime.Now;
Console.WriteLine("Running {0}", i);
// Fake some sleep..
var r = rand.Next(1000, 2000);
// Store how long it slept
context.SleepDuration = r;
await Task.Delay(r);
// Blow up so we can ensure we're processing items down the list..
if (i % 2 == 0 || i == 3)
throw new InvalidOperationException("Bad #");
// Success
return i;
},
// Error
(i, result, e, c) =>
{
Console.WriteLine("Error at {0}. {1}", i, e.Message);
},
// Completed
(i, result, context) =>
{
var diff = DateTime.Now.Subtract((DateTime)context.StartTime);
Console.WriteLine("{0} completed. Duration {1}, sleep was {2}", i, diff.TotalSeconds, context.SleepDuration);
});
var results = await execTask;
// Show results
foreach (var r in results)
Console.WriteLine("Result {0}", r);
// Output:
/*
Running 2
Running 1
Running 0
Error at 2. Bad #
Error at 0. Bad #
Running 3
Running 4
2 completed. Duration 1.6304229, sleep was 1473
0 completed. Duration 1.6314189, sleep was 1525
1 completed. Duration 2.0191785, sleep was 1998
Error at 4. Bad #
4 completed. Duration 1.1002609, sleep was 1052
Running 5
Error at 3. Bad #
3 completed. Duration 1.7536598, sleep was 1690
Running 6
5 completed. Duration 1.9459575, sleep was 1945
Error at 6. Bad #
6 completed. Duration 1.3577158, sleep was 1307
Running 7
7 completed. Duration 1.3704851, sleep was 1371
Result 1
Result 5
Result 7
Done
*/
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment