Skip to content

Instantly share code, notes, and snippets.

@HalidCisse
Forked from sheastrickland/gotcha.md
Created May 5, 2018 15:04
Show Gist options
  • Save HalidCisse/908ce8d174a9b32e9a23512e47a8ecc0 to your computer and use it in GitHub Desktop.
Save HalidCisse/908ce8d174a9b32e9a23512e47a8ecc0 to your computer and use it in GitHub Desktop.
BlockingCollection with Parallel.ForEach with a Partitioner

Lesson Learnt, Streaming Producer / Consumer issue using:

BlockingCollection<T> and Parallel.ForEach(_blockingCollection.GetConsumingEnumerable

Beware default Partitioner algo, which is chunking and buffering.

The GetConsumingPartitioner heading covers it, and also the first comment from Stephen Toub totally nails it. We implemented similar to what commenter Hernan pointed out. Stephen Toub Blog

My absolute saviour, NoBuffering in: EnumerablePartitionerOptions

Inlined the issue:

This issue you were hitting... was the Parallel.ForEach call returning, or did the ForEach call block forever? If the latter, it would make sense that it was waiting for enough items to fill the chunk, but if those items were never arriving, it would wait forever for them to. Or, more specifically, it would wait until either enough elements arrived or until the blocking collection's CompletedAdding method was called, which would inform the Parallel.ForEach that no more data would arrive and so it should stop waiting to fill the chunk.

We had occasional "missing" elements which were successfully added to the collection, and successfully iterated, just never ran the delegate. Until we gracefully shutdown the service, which signalled the CancellationToken and it would then happilly process the work, man that ate a few hours! (edited)

//Before
var options = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount * 5, CancellationToken = cancel.Token };
var results = Parallel.ForEach(_blockingCollection.GetConsumingEnumerable(cancel.Token), options, work =>

//After
var options = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount * 5, CancellationToken = cancel.Token };
var partitioner = Partitioner.Create(_blockingCollection.GetConsumingEnumerable(cancel.Token), EnumerablePartitionerOptions.NoBuffering);
var results = Parallel.ForEach(partitioner, options, work =>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment