Skip to content

Instantly share code, notes, and snippets.

@sheastrickland
Created August 2, 2015 11:26
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save sheastrickland/49392be4bb7b2f6b2143 to your computer and use it in GitHub Desktop.
Save sheastrickland/49392be4bb7b2f6b2143 to your computer and use it in GitHub Desktop.
BlockingCollection with Parallel.ForEach with a Partitioner

Lesson Learnt, Streaming Producer / Consumer issue using:

BlockingCollection<T> and Parallel.ForEach(_blockingCollection.GetConsumingEnumerable

Beware default Partitioner algo, which is chunking and buffering.

The GetConsumingPartitioner heading covers it, and also the first comment from Stephen Toub totally nails it. We implemented similar to what commenter Hernan pointed out. Stephen Toub Blog

My absolute saviour, NoBuffering in: EnumerablePartitionerOptions

Inlined the issue:

This issue you were hitting... was the Parallel.ForEach call returning, or did the ForEach call block forever? If the latter, it would make sense that it was waiting for enough items to fill the chunk, but if those items were never arriving, it would wait forever for them to. Or, more specifically, it would wait until either enough elements arrived or until the blocking collection's CompletedAdding method was called, which would inform the Parallel.ForEach that no more data would arrive and so it should stop waiting to fill the chunk.

We had occasional "missing" elements which were successfully added to the collection, and successfully iterated, just never ran the delegate. Until we gracefully shutdown the service, which signalled the CancellationToken and it would then happilly process the work, man that ate a few hours! (edited)

//Before
var options = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount * 5, CancellationToken = cancel.Token };
var results = Parallel.ForEach(_blockingCollection.GetConsumingEnumerable(cancel.Token), options, work =>

//After
var options = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount * 5, CancellationToken = cancel.Token };
var partitioner = Partitioner.Create(_blockingCollection.GetConsumingEnumerable(cancel.Token), EnumerablePartitionerOptions.NoBuffering);
var results = Parallel.ForEach(partitioner, options, work =>
@bertnz
Copy link

bertnz commented Mar 13, 2018

Just saved me going crazy after 3 hours of trying to figure out why my process was getting stuck randomly.

Seems like this is definitely a bug with .NET framework.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment