Skip to content

Instantly share code, notes, and snippets.

@andy-williams
Last active April 16, 2022 18:56
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andy-williams/6b7c749bb9a58def0f4c to your computer and use it in GitHub Desktop.
Save andy-williams/6b7c749bb9a58def0f4c to your computer and use it in GitHub Desktop.
Parallel Programming in C# : Patterns
// Problem:
// We have lots of data that we want to process that can be easily parallelised
// We want to process all our data and combine the results
// "Map is an idiom in parallel computing where a simple operation is applied to all elements of a
// sequence, potentially in parallel.[1] It is used to solve embarrassingly parallel problems: those
// problems that can be decomposed into independent subtasks, requiring no
// communication/synchronization between the subtasks except a join or barrier at the end."
// - https://en.wikipedia.org/wiki/Map_(parallel_pattern)
void Main()
{
var sw = System.Diagnostics.Stopwatch.StartNew();
var dataToProcess = Enumerable.Range(0, 1000000);
var result = new List<int>();
Parallel.ForEach(dataToProcess,
() => new List<int>(), // for storing result locally
(toProcess, loopControl, localList) => // map
{
localList.Add(toProcess + 10);
return localList;
},
(localList) => // this is our finaliser, reducing our map result into our result list
{
lock(result)
{
result.AddRange(localList);
}
});
sw.Stop();
sw.Elapsed.Dump();
result.Min().Dump();
result.Count.Dump();
}
// Problem:
// Want to execute a lot of operations in parallel.
// No ordering in execution is required.
// Each operation can take a few minutes (3-5+).
void Main()
{
var N = 100; // Number of tasks
// SOLUTION 1 - Use Parallel library
Parallel.For(0, N, (i) => {
// parallel work here
});
// SOLUTION 2 - You could improve from solution one by passing in MaxDegreeOfParallelism
var pOptions = new ParallelOptions()
{
MaxDegreeOfParallelism = System.Environment.ProcessorCount
};
Parallel.For(0, N, pOptions, (i) => {
// parallel work here
});
// SOLUTION 3 - only spawn threads that will use each CPU
// Long solution
// Needs to be reviewed
var cores = System.Environment.ProcessorCount;
var tasks = new List<Task>();
// add tasks
for(var i=0; i < cores; i++)
{
var t = new Task(() => {
// parallel work here
});
}
while(N > 0)
{
var i = Task.WaitAny(tasks);
tasks.RemoveAt(i);
tasks.Add(new Task(() => {
// parallel work here
}));
N--;
}
}
// Very useful for quickly assessing whether parallelising work can give benefits
// the results seem to be slower than writing custom parallel code using an appropriate pattern
void Main()
{
var sw = System.Diagnostics.Stopwatch.StartNew();
var dataToProcess = Enumerable.Range(0, 1000000);
var result = new List<int>();
result = dataToProcess.AsParallel().Select(x => x + 10).ToList();
sw.Stop();
sw.Elapsed.Dump();
result.Min().Dump();
result.Count.Dump();
}
// Problem:
// Want to execute a lot of tasks, but each task does not really do much in terms of CPU cycles
// The main issue with this pattern is contension - the consumers are most probably emptying the task quicker
// than the producer is adding tasks
void Main()
{
// Producer/Consumer Pattern
// Producer-consumer pattern splits work into smaller chunks and feeds them into a thread-safe/concurrent queue
// The difference between this and spawning threads for each task is that with this pattern, threads
// are kept alive as long-running threads until all the work in the queue is done, so no expensive
// constant spawning and killing of threads
// [producers]->[queue]->[consumers]
const int MAX_CAPACITY = 10000;
var sw = System.Diagnostics.Stopwatch.StartNew();
var dataToProcess = Enumerable.Range(0, 1000000);
var result = new List<int>();
var workQueue = new BlockingCollection<int>(MAX_CAPACITY);
var numOfCores = System.Environment.ProcessorCount;
var @lock = new object();
var longRunningTaskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
// You're also allowed multiple producers if required
var producer = longRunningTaskFactory.StartNew(() => {
foreach(var toProcess in dataToProcess)
{
workQueue.Add(toProcess);
}
// essential to let our consumers know that we're done so they can stop trying to consume more stuff to do
workQueue.CompleteAdding();
});
// we will store results as separate set of results
var consumerResults = new Task<List<int>>[numOfCores];
// spawn consumer threads of the same amount as our cores
for(var i = 0; i < numOfCores; i++)
{
consumerResults[i] = longRunningTaskFactory.StartNew(() => {
var localResult = new List<int>();
try
{
while(!workQueue.IsCompleted)
{
// this is where we do our work
var toProcess = workQueue.Take();
localResult.Add(toProcess + 10);
}
}
catch(InvalidOperationException ex)
{
// workQueue is completed
}
catch(Exception ex)
{
// something terrible happened
}
return localResult;
});
}
// WARNING: this would produce incorrect result, because Task.WaitAny does not care
// when a task has finished potentially merging the same result set twice from the same consumer
// var merged = 0;
// while(merged < numOfCores)
// {
// var taskId = Task.WaitAny(consumerResults);
// var consumerResult = consumerResults[taskId].Result;
// result.AddRange(consumerResult);
// merged++;
// }
Task.WaitAll(consumerResults);
foreach(var consumerResult in consumerResults)
{
result.AddRange(consumerResult.Result);
}
sw.Stop(); // all done
result.Min().Dump();
result.Count.Dump();
sw.Elapsed.Dump();
}
@suprafun
Copy link

Hello, under what license are these snippets released under ? I would like to study them to learn from them. Thank you.

@andy-williams
Copy link
Author

andy-williams commented Apr 27, 2021 via email

@suprafun
Copy link

suprafun commented May 1, 2021

Thank you so very much.

@arsu-leo
Copy link

Love the MapReduce implementation, didnt notice that possiblity

@andy-williams
Copy link
Author

andy-williams commented Apr 16, 2022

Love the MapReduce implementation, didnt notice that possiblity

@arsu-leo Keep in mind that this is quite old, you can achieve this in an async manner using Channels if you're using .NET Core. Reading material here https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/. The idea is that you can create multiple writers and readers for the same channel - achieving the same pattern, but with async support + easier to design a more flexible system as there's a better separation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment