Skip to content

Instantly share code, notes, and snippets.

@controlflow
Last active September 7, 2021 10:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save controlflow/64bece6e21b4a31393befd1b999d7ef6 to your computer and use it in GitHub Desktop.
Save controlflow/64bece6e21b4a31393befd1b999d7ef6 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
// the task is to Analyze() files in parallel and Process() them on the main thread in batches
var files = Enumerable.Range(0, 1000).Select(x => $"File{x:0000}.cs").ToList();
var degreeOfParallelism = Math.Min(Environment.ProcessorCount, 8);
var unfinishedJob = new ConcurrentBag<string>(files);
var itemsToProcessBatch = 10;
var toProcess = new ConcurrentBag<string>();
while (unfinishedJob.Count > 0)
{
// do Analyze() in parallel on background threads
Task.WaitAll(Enumerable
.Range(0, degreeOfParallelism)
.Select(workerIndex =>
{
// can take some time, better to use long-running tasks?
return Task.Run(() =>
{
while (unfinishedJob.TryTake(out var workItem))
{
if (Analyze(workerIndex, workItem))
{
toProcess.Add(workItem); // can actually add up to 20 items, that's fine
}
// this freezes the whole ConcurrentBag :(
if (toProcess.Count > itemsToProcessBatch)
{
Console.Write('*');
return; // time to process
}
}
});
})
.ToArray());
Console.WriteLine();
// process the work items sequentially on the main thread
while (toProcess.TryTake(out var workItem))
{
Process(workItem);
}
}
bool Analyze(int workerIndex, string workItem)
{
Thread.Sleep(100);
var include = workItem.GetHashCode() % 20 == 0;
Console.Write(include ? $"[{workerIndex}]" : workerIndex.ToString());
return include; // 5%
}
void Process(string workItem)
{
Console.WriteLine($"Processing {workItem}");
Thread.Sleep(200);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment