Created
July 3, 2018 02:04
-
-
Save ReubenBond/f08f60babdccc83bc0103512e086fd09 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Security.Principal; | |
using System.Threading; | |
namespace BlockingCollectionConcurrentBagException | |
{ | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
// We only hold a reference to the outer BlockingCollection, not the inner ConcurrentBag | |
var blockingCollection = new BlockingCollection<int>(new ConcurrentBag<int>()); | |
const int Count = 1_000_000; | |
var allConsumed = new List<int>[] { new List<int>(Count), new List<int>(Count), }; | |
// Produce elements into the collection. | |
var added = 0; | |
var producerFunc = new WaitCallback[1]; | |
producerFunc[0] = | |
state => | |
{ | |
if (blockingCollection.Count == 0) | |
{ | |
blockingCollection.Add(added++); | |
if (added == Count) | |
{ | |
blockingCollection.CompleteAdding(); | |
return; | |
} | |
} | |
else Thread.Yield(); | |
ThreadPool.QueueUserWorkItem(producerFunc[0]); | |
}; | |
ThreadPool.QueueUserWorkItem(producerFunc[0]); | |
var wasCompleted = new bool[] {false, false}; | |
// Concurrently pull elements off the collection. | |
var consumerFunc = new WaitCallback[1]; | |
consumerFunc[0] = | |
index => | |
{ | |
var i = (int) index; | |
var success = false; | |
do | |
{ | |
try | |
{ | |
// THIS IS WHERE THE EXCEPTION OCCURS | |
success = blockingCollection.TryTake(out var num); | |
if (success) | |
{ | |
allConsumed[i].Add(num); | |
} | |
else | |
{ | |
if (blockingCollection.IsCompleted) | |
{ | |
// Does waiting make the items appear? (no) | |
Thread.Sleep(1000); | |
if (wasCompleted[i]) return; | |
wasCompleted[i] = true; | |
} | |
} | |
} | |
catch | |
{ | |
} | |
} while (success); | |
ThreadPool.QueueUserWorkItem(consumerFunc[0], index); | |
}; | |
// Start two competing consumers. | |
ThreadPool.QueueUserWorkItem(consumerFunc[0], 0); | |
ThreadPool.QueueUserWorkItem(consumerFunc[0], 1); | |
while (!blockingCollection.IsCompleted) Thread.Sleep(100); | |
Thread.Sleep(1000); | |
var count = allConsumed[0].Count + allConsumed[1].Count; | |
Console.WriteLine("Produced elements: " + added); | |
Console.WriteLine("Consumed elements: " + count); | |
var total = new HashSet<int>(); | |
foreach (var e in allConsumed[0]) if (!total.Add(e)) Console.WriteLine($"0 Error adding {e} to total"); | |
foreach (var e in allConsumed[1]) if (!total.Add(e)) Console.WriteLine($"1 Error adding {e} to total"); | |
Console.WriteLine("Unique elements: " + total.Count); | |
Console.ReadKey(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment