Skip to content

Instantly share code, notes, and snippets.

@theodorzoulias
Last active February 27, 2023 09:10
Show Gist options
  • Save theodorzoulias/cb7616aa8f10656e81a72ccd7eb11d86 to your computer and use it in GitHub Desktop.
Save theodorzoulias/cb7616aa8f10656e81a72ccd7eb11d86 to your computer and use it in GitHub Desktop.
/// <summary>
/// Invokes a delegate for a node and all its descendants in parallel.
/// Children are invoked after the completion of their parent.
/// </summary>
public static ParallelLoopResult ParallelTraverseHierarchical<TNode>(
TNode root,
ParallelOptions parallelOptions,
Action<TNode, ParallelLoopState> body,
Func<TNode, IEnumerable<TNode>> childrenSelector)
{
ArgumentNullException.ThrowIfNull(parallelOptions);
ArgumentNullException.ThrowIfNull(body);
ArgumentNullException.ThrowIfNull(childrenSelector);
// The source of the Parallel loop is a concurrent stack of stacks.
// Use the List<T> as the type of the inner stacks, because it is more flexible than the Stack<T> collection.
ConcurrentStack<List<TNode>> sharedStack = new();
sharedStack.Push(new List<TNode>() { root });
int pendingCount = sharedStack.Count;
// Use a SemaphoreSlim as blocker, because it is more flexible than the BlockingCollection<T>.
using SemaphoreSlim blockingSemaphore = new(pendingCount);
IEnumerable<List<TNode>> GetConsumingEnumerable()
{
while (true)
{
blockingSemaphore.Wait();
if (!sharedStack.TryPop(out List<TNode> stack)) break;
yield return stack;
}
}
Partitioner<List<TNode>> partitioner = Partitioner.Create(
GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering);
return Parallel.ForEach(partitioner, parallelOptions, (stack, state) =>
{
try
{
while (stack.Count > 0)
{
// Pop a node from the stack
TNode node = stack[^1]; stack.RemoveAt(stack.Count - 1);
body(node, state);
if (state.ShouldExitCurrentIteration) break;
IEnumerable<TNode> children = childrenSelector(node);
if (children is not null)
{
int previousCount = stack.Count;
stack.AddRange(children);
// The first child should be on top of the stack, so the newly pushed nodes should be reversed.
stack.Reverse(previousCount, stack.Count - previousCount);
}
if (sharedStack.IsEmpty && stack.Count > 1)
{
// Populate the shared stack by splitting the local stack to multiple stacks (up to 4).
int stacksCount = Math.Clamp(stack.Count, 1, 4);
List<TNode>[] stacks = new List<TNode>[stacksCount];
for (int i = 0; i < stacks.Length; i++)
stacks[i] = new();
for (int i = 0; i < stack.Count; i++)
stacks[i % stacks.Length].Add(stack[i]);
// Replace the local stack with the last of the new stacks.
stack = stacks[^1];
// Add all other stacks in the shared stack.
Interlocked.Add(ref pendingCount, stacks.Length - 1);
sharedStack.PushRange(stacks, 0, stacks.Length - 1);
blockingSemaphore.Release(stacks.Length - 1);
}
}
}
finally
{
if (Interlocked.Decrement(ref pendingCount) == 0)
blockingSemaphore.Release();
}
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment