Skip to content

Instantly share code, notes, and snippets.

@mmurrell
Last active June 8, 2019 03:29
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mmurrell/9225ed7c4d107c2195057f77e07f0f68 to your computer and use it in GitHub Desktop.
Save mmurrell/9225ed7c4d107c2195057f77e07f0f68 to your computer and use it in GitHub Desktop.
Concurrency issues in Batch extension method.
public static class BatchLinq
{
public static IEnumerable<IEnumerable<T>> Batch<T>(this IEnumerable<T> source, int size)
{
if (size <= 0)
throw new ArgumentOutOfRangeException("size", "Must be greater than zero.");
using (IEnumerator<T> enumerator = source.GetEnumerator())
while (enumerator.MoveNext())
yield return TakeIEnumerator(enumerator, size);
}
private static IEnumerable<T> TakeIEnumerator<T>(IEnumerator<T> source, int size)
{
int i = 0;
do
yield return source.Current;
while (++i < size && source.MoveNext());
}
}
static ConcurrentBag<int> processed_numbers = new ConcurrentBag<int>();
void Main()
{
var sample = Enumerable.Range(1,22);
var batches = sample.Batch(5);
Console.WriteLine("Beginning threaded processing");
var options = new ParallelOptions() { MaxDegreeOfParallelism = 5 };
var results = Parallel.ForEach(batches, options, ProcessBatch);
Console.WriteLine("End threaded processing");
Assert(results.IsCompleted, "Parallel processing failed.");
Assert(processed_numbers.Count() == sample.Count(), "Input and output counts are different.");
var counter = 0;
foreach(var (b, a) in sample.Zip(processed_numbers.OrderBy(x=>x), (b,a)=>(b,a)))
{
Console.WriteLine($"Index: {++counter:00} Before: {b:00} -- After: {a:00}");
}
}
private void ProcessBatch(IEnumerable<int> batch)
{
int counter = 0;
var currentBatchList = new List<int>();
foreach (var val in batch)
{
Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} - Adding value {val} to bag, index {counter} in this batch.");
counter++;
currentBatchList.Add(val);
processed_numbers.Add(val);
}
Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} - Batch completed, processed {currentBatchList.Count()} numbers.");
}
private void Assert(bool condition, string message){
if(!condition)
Console.WriteLine("*** FAILURE: " + message);
}
@mmurrell
Copy link
Author

mmurrell commented Mar 8, 2019

This code produces the following results on my machine. The code is a bit of boilerplate, butI tried to demonstrate that in a multi-threaded environment, several items are not processed at all, and several items are processed multiple times. Theoretically, I would expect all numbers to appear in one and only one batch, and the counts to be distributed evenly. I would also expect that after removing the random ordering, the input list and the output list would match exactly.

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment