Skip to content

Instantly share code, notes, and snippets.

@controlflow
Last active October 5, 2022 20:10
Show Gist options
  • Save controlflow/950f6b170fc4eef20fd8282366c09b02 to your computer and use it in GitHub Desktop.
Save controlflow/950f6b170fc4eef20fd8282366c09b02 to your computer and use it in GitHub Desktop.
Parallel processing pattern
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
// the task is to Analyze() files in parallel and Process() them on the main thread in batches
var files = Enumerable.Range(0, 1000).Select(x => $"File{x:0000}.cs").ToList();
var degreeOfParallelism = Math.Min(Environment.ProcessorCount, 8);
var unfinishedJob = new ConcurrentBag<string>(files);
var itemsToProcessBatch = 10;
var toProcess = new ConcurrentBag<string>();
var toProcessCount = 0;
while (unfinishedJob.Count > 0)
{
// do Analyze() in parallel on background threads
Task.WaitAll(Enumerable
.Range(0, degreeOfParallelism)
.Select(workerIndex =>
{
// can take some time, better to use long-running tasks?
return Task.Run(() =>
{
while (unfinishedJob.TryTake(out var workItem))
{
if (Analyze(workerIndex, workItem))
{
toProcess.Add(workItem);
}
var incremented = Interlocked.Increment(ref toProcessCount);
if (incremented > itemsToProcessBatch)
{
Console.Write('*');
return; // time to process
}
}
});
})
.ToArray());
Console.WriteLine();
// process the work items sequentially on the main thread
while (toProcess.TryTake(out var workItem))
{
Process(workItem);
}
toProcessCount = 0;
}
bool Analyze(int workerIndex, string workItem)
{
Thread.Sleep(100);
var include = workItem.GetHashCode() % 20 == 0;
Console.Write(include ? $"[{workerIndex}]" : workerIndex.ToString());
return include; // 5%
}
void Process(string workItem)
{
Console.WriteLine($"Processing {workItem}");
Thread.Sleep(200);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment