-
-
Save dmarlow/dce4870dd6139c6fd075 to your computer and use it in GitHub Desktop.
Concurrent Executor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Dynamic; | |
using System.Linq; | |
using System.Threading.Tasks; | |
using System.Threading.Tasks.Dataflow; | |
namespace Degreed.Processing | |
{ | |
/// <summary> | |
/// Concurrently executes actions over the given collection until results are satisfied. | |
/// </summary> | |
public class ConcurrentExecutor | |
{ | |
/// <summary> | |
/// Execute the specified action over the items and if no error/exception | |
/// is thrown within the action, it will return the specified number of results requested. | |
/// Error check and completed hooks are there, with specific context, for | |
/// logging/notification handling. | |
/// </summary> | |
/// <typeparam name="T">The input collection type.</typeparam> | |
/// <typeparam name="U">The result collection type.</typeparam> | |
/// <param name="resultsDesired">Number of items desired from the collection. Minimum of 1 required.</param> | |
/// <param name="items">The items to concurrently execute the action on.</param> | |
/// <param name="action">The action to execute on each item. This func takes the item, a dynamic expando-object | |
/// that can be used to store information (time execution started for calculating execution time, etc.)</param> | |
/// <param name="errorCheck">When an error is encountered, this action is invoked with the | |
/// item being executed when the error occurred, any output object (probably null or default of type U), | |
/// the exception object and the context expando-object.</param> | |
/// <param name="completed">Action invoked when the processing completes regardless of any error. | |
/// Action is given the item being actioned, any result and the context.</param> | |
/// <returns></returns> | |
public async Task<IEnumerable<U>> Execute<T, U>(int resultsDesired, IEnumerable<T> items, | |
Func<T, dynamic, Task<U>> action, | |
Action<T, U, Exception, dynamic> errorCheck = null, | |
Action<T, U, dynamic> completed = null) | |
{ | |
if (items.IsNullOrEmpty()) | |
return Enumerable.Empty<U>(); | |
if (action == null) | |
throw new ArgumentNullException("action", "Action required for execution"); | |
// Used for returning after the number of results has been reached. | |
var waitEvt = new AsyncAutoResetEvent(); | |
// Place results in this buffer | |
var resultBlock = new BufferBlock<U>(); | |
// Place items to be acted on in this buffer | |
var processBlock = new BufferBlock<T>(); | |
// Place all items that haven't been processed in this buffer. | |
// We'll pluck them from here and put them into the process block to | |
// be processed. | |
var inputBlock = new BufferBlock<T>(); | |
// Enumerate the IEnumerable so we don't re-enumerate it more than once. | |
var itemsEnumerated = items as IList<T> ?? items.ToList(); | |
// Adjust the # of results desired. Must have at least 1 result. | |
resultsDesired = Math.Max(1, Math.Min(resultsDesired, itemsEnumerated.Count)); | |
// Post remaining in the order given. Skip the # of results desired because we're going to | |
// parallelize the execution over the # of results desired. If there are failures | |
// then take an item from the input block and place it in the process block. | |
itemsEnumerated.Skip(resultsDesired).ToList().ForEach(x => inputBlock.Post(x)); | |
// Keep track of how many items have been executed. | |
int itemsExecuted = 0; | |
// Wrap the action given with our execution action that contains a context, | |
// and hooks for error/completed actions. Upon error handling, transfer one item | |
// from the input block to the process block. | |
var executeAction = new Action<T>(async t => | |
{ | |
// Dynamic context for this execution call | |
dynamic v = new ExpandoObject(); | |
// Default result | |
U actionResult = default(U); | |
try | |
{ | |
// Perform action specified | |
actionResult = await action(t, v); | |
// Got here, no error happened in the action. Store it in the result buffer block. | |
resultBlock.Post(actionResult); | |
} | |
catch (Exception e) | |
{ | |
if (errorCheck != null) | |
errorCheck(t, actionResult, e, v); | |
// If we haven't reached our desired #, place another in the | |
// process buffer. | |
if (resultBlock.Count < resultsDesired) | |
{ | |
T tToProcess; | |
if (inputBlock.TryReceive(out tToProcess)) | |
{ | |
processBlock.Post(tToProcess); | |
} | |
} | |
} | |
finally | |
{ | |
itemsExecuted++; | |
if (completed != null) | |
completed(t, actionResult, v); | |
// We're done if we've reached the number we've been looking for | |
if (resultBlock.Count == resultsDesired || itemsExecuted == itemsEnumerated.Count) | |
{ | |
waitEvt.Set(); | |
} | |
} | |
}); | |
// Handles processing any items due to failures | |
var actionBlock = new ActionBlock<T>(executeAction, | |
new ExecutionDataflowBlockOptions | |
{ | |
MaxDegreeOfParallelism = resultsDesired, | |
MaxMessagesPerTask = DataflowBlockOptions.Unbounded | |
}); | |
// Anything that goes into the process block will be executed | |
processBlock.LinkTo(actionBlock); | |
// Perform n concurrently to start. If there are errors, run more concurrently. | |
itemsEnumerated.Take(resultsDesired).AsParallel().ForAll(executeAction); | |
// Wait here asynchronously until we've completed # desired or end reached. | |
await waitEvt.WaitAsync(); | |
// Done. Get everything in the results block. Can be null.. | |
IList<U> results; | |
resultBlock.TryReceiveAll(out results); | |
// Return an empty if null found. | |
return results ?? Enumerable.Empty<U>(); | |
} | |
public async Task Example() | |
{ | |
var rand = new Random(); | |
var executor = new ConcurrentExecutor(); | |
var execTask = executor.Execute<int, int>(3, Enumerable.Range(0, 10), | |
// Action | |
async (i, context) => | |
{ | |
// Store when it started | |
context.StartTime = DateTime.Now; | |
Console.WriteLine("Running {0}", i); | |
// Fake some sleep.. | |
var r = rand.Next(1000, 2000); | |
// Store how long it slept | |
context.SleepDuration = r; | |
await Task.Delay(r); | |
// Blow up so we can ensure we're processing items down the list.. | |
if (i % 2 == 0 || i == 3) | |
throw new InvalidOperationException("Bad #"); | |
// Success | |
return i; | |
}, | |
// Error | |
(i, result, e, c) => | |
{ | |
Console.WriteLine("Error at {0}. {1}", i, e.Message); | |
}, | |
// Completed | |
(i, result, context) => | |
{ | |
var diff = DateTime.Now.Subtract((DateTime)context.StartTime); | |
Console.WriteLine("{0} completed. Duration {1}, sleep was {2}", i, diff.TotalSeconds, context.SleepDuration); | |
}); | |
var results = await execTask; | |
// Show results | |
foreach (var r in results) | |
Console.WriteLine("Result {0}", r); | |
// Output: | |
/* | |
Running 2 | |
Running 1 | |
Running 0 | |
Error at 2. Bad # | |
Error at 0. Bad # | |
Running 3 | |
Running 4 | |
2 completed. Duration 1.6304229, sleep was 1473 | |
0 completed. Duration 1.6314189, sleep was 1525 | |
1 completed. Duration 2.0191785, sleep was 1998 | |
Error at 4. Bad # | |
4 completed. Duration 1.1002609, sleep was 1052 | |
Running 5 | |
Error at 3. Bad # | |
3 completed. Duration 1.7536598, sleep was 1690 | |
Running 6 | |
5 completed. Duration 1.9459575, sleep was 1945 | |
Error at 6. Bad # | |
6 completed. Duration 1.3577158, sleep was 1307 | |
Running 7 | |
7 completed. Duration 1.3704851, sleep was 1371 | |
Result 1 | |
Result 5 | |
Result 7 | |
Done | |
*/ | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment