Skip to content

Instantly share code, notes, and snippets.

@bardware
Forked from sheastrickland/gotcha.md
Created December 28, 2023 22:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bardware/4acf646af38f181d4c317a8cfa14953a to your computer and use it in GitHub Desktop.
Save bardware/4acf646af38f181d4c317a8cfa14953a to your computer and use it in GitHub Desktop.
BlockingCollection with Parallel.ForEach with a Partitioner

Lesson Learnt, Streaming Producer / Consumer issue using:

BlockingCollection<T> and Parallel.ForEach(_blockingCollection.GetConsumingEnumerable

Beware default Partitioner algo, which is chunking and buffering.

The GetConsumingPartitioner heading covers it, and also the first comment from Stephen Toub totally nails it. We implemented similar to what commenter Hernan pointed out. Stephen Toub Blog

My absolute saviour, NoBuffering in: EnumerablePartitionerOptions

Inlined the issue:

This issue you were hitting... was the Parallel.ForEach call returning, or did the ForEach call block forever? If the latter, it would make sense that it was waiting for enough items to fill the chunk, but if those items were never arriving, it would wait forever for them to. Or, more specifically, it would wait until either enough elements arrived or until the blocking collection's CompletedAdding method was called, which would inform the Parallel.ForEach that no more data would arrive and so it should stop waiting to fill the chunk.

We had occasional "missing" elements which were successfully added to the collection, and successfully iterated, just never ran the delegate. Until we gracefully shutdown the service, which signalled the CancellationToken and it would then happilly process the work, man that ate a few hours! (edited)

//Before
var options = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount * 5, CancellationToken = cancel.Token };
var results = Parallel.ForEach(_blockingCollection.GetConsumingEnumerable(cancel.Token), options, work =>

//After
var options = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount * 5, CancellationToken = cancel.Token };
var partitioner = Partitioner.Create(_blockingCollection.GetConsumingEnumerable(cancel.Token), EnumerablePartitionerOptions.NoBuffering);
var results = Parallel.ForEach(partitioner, options, work =>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment