Skip to content

Instantly share code, notes, and snippets.

@ReubenBond
Created July 3, 2018 02:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ReubenBond/f08f60babdccc83bc0103512e086fd09 to your computer and use it in GitHub Desktop.
Save ReubenBond/f08f60babdccc83bc0103512e086fd09 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Security.Principal;
using System.Threading;
namespace BlockingCollectionConcurrentBagException
{
class Program
{
static void Main(string[] args)
{
// We only hold a reference to the outer BlockingCollection, not the inner ConcurrentBag
var blockingCollection = new BlockingCollection<int>(new ConcurrentBag<int>());
const int Count = 1_000_000;
var allConsumed = new List<int>[] { new List<int>(Count), new List<int>(Count), };
// Produce elements into the collection.
var added = 0;
var producerFunc = new WaitCallback[1];
producerFunc[0] =
state =>
{
if (blockingCollection.Count == 0)
{
blockingCollection.Add(added++);
if (added == Count)
{
blockingCollection.CompleteAdding();
return;
}
}
else Thread.Yield();
ThreadPool.QueueUserWorkItem(producerFunc[0]);
};
ThreadPool.QueueUserWorkItem(producerFunc[0]);
var wasCompleted = new bool[] {false, false};
// Concurrently pull elements off the collection.
var consumerFunc = new WaitCallback[1];
consumerFunc[0] =
index =>
{
var i = (int) index;
var success = false;
do
{
try
{
// THIS IS WHERE THE EXCEPTION OCCURS
success = blockingCollection.TryTake(out var num);
if (success)
{
allConsumed[i].Add(num);
}
else
{
if (blockingCollection.IsCompleted)
{
// Does waiting make the items appear? (no)
Thread.Sleep(1000);
if (wasCompleted[i]) return;
wasCompleted[i] = true;
}
}
}
catch
{
}
} while (success);
ThreadPool.QueueUserWorkItem(consumerFunc[0], index);
};
// Start two competing consumers.
ThreadPool.QueueUserWorkItem(consumerFunc[0], 0);
ThreadPool.QueueUserWorkItem(consumerFunc[0], 1);
while (!blockingCollection.IsCompleted) Thread.Sleep(100);
Thread.Sleep(1000);
var count = allConsumed[0].Count + allConsumed[1].Count;
Console.WriteLine("Produced elements: " + added);
Console.WriteLine("Consumed elements: " + count);
var total = new HashSet<int>();
foreach (var e in allConsumed[0]) if (!total.Add(e)) Console.WriteLine($"0 Error adding {e} to total");
foreach (var e in allConsumed[1]) if (!total.Add(e)) Console.WriteLine($"1 Error adding {e} to total");
Console.WriteLine("Unique elements: " + total.Count);
Console.ReadKey();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment