Skip to content

Instantly share code, notes, and snippets.

@AliveDevil
Forked from CharlieDigital/ParallelForVsChannel.cs
Created October 5, 2023 16:02
Show Gist options
  • Save AliveDevil/62de8d4ccffd5f86980c1db8601973cd to your computer and use it in GitHub Desktop.
Save AliveDevil/62de8d4ccffd5f86980c1db8601973cd to your computer and use it in GitHub Desktop.
Comparing Parallel.For vs Channels
// Generate a set of 100 records, each with a random wait interval.
using System.Collections.Immutable;
using System.Diagnostics;
using System.Threading.Channels;
public static class Program
{
static ImmutableArray<(int Index, int Delay)> workload = Enumerable
.Range(0, 100)
.Select(i => (Index: i, Delay: 25 + i % 25))
.ToImmutableArray();
public static Task Main(string[] args)
{
switch (args[0])
{
case "Channels":
// Using System.Threading.Channels
return InstrumentedRun("Channel", async () =>
{
var channel = Channel.CreateUnbounded<int>();
async Task Run(ChannelWriter<int> writer, int id, int delay)
{
await Task.Delay(delay);
await writer.WriteAsync(id);
}
async Task Receive(ChannelReader<int> reader)
{
while (await reader.WaitToReadAsync())
{
if (reader.TryRead(out var id))
{
// No work here.
//log($" Completed {id}");
}
}
}
var receiveTask = Receive(channel.Reader);
var processingTasks = workload
.AsParallel()
.Select(e => Run(channel.Writer, e.Index, e.Delay));
await Task
.WhenAll(processingTasks)
.ContinueWith(_ => channel.Writer.Complete());
await receiveTask;
});
case "ParallelFor4":
// Using Parallel.For with concurrency of 4
return InstrumentedRun("Parallel.For @ 4", () =>
{
Parallel.For(0, 100, new ParallelOptions { MaxDegreeOfParallelism = 4 }, (index) =>
{
Thread.Sleep(workload[index].Delay);
});
return ValueTask.CompletedTask;
});
case "ParallelForEach4":
// Using Parallel.ForEachAsync with concurrency of 4
return InstrumentedRun("Parallel.ForEachAsync @ 4", async () =>
await Parallel.ForEachAsync(workload, new ParallelOptions { MaxDegreeOfParallelism = 4 }, async (item, cancel) =>
{
await Task.Delay(item.Delay, cancel);
})
);
case "ParallelForEach40":
// Using Parallel.ForEachAsync with concurrency of 40
return InstrumentedRun("Parallel.ForEachAsync @ 40", async () =>
await Parallel.ForEachAsync(workload, new ParallelOptions { MaxDegreeOfParallelism = 40 }, async (item, cancel) =>
{
await Task.Delay(item.Delay, cancel);
})
);
case "ParallelForEach":
// Using Parallel.ForEachAsync with concurrency unset
return InstrumentedRun("Parallel.ForEachAsync (Default)", async () =>
await Parallel.ForEachAsync(workload, async (item, cancel) =>
{
await Task.Delay(item.Delay, cancel);
})
);
default:
return Task.CompletedTask;
}
}
/*-----------------------------------------------------------
* Supporting functions
---------------------------------------------------------*/
static async Task InstrumentedRun(string name, Func<ValueTask> test)
{
var threadsAtStart = Process.GetCurrentProcess().Threads.Count;
var timer = Stopwatch.StartNew();
await test();
timer.Stop();
Console.WriteLine($"[{name}] = {timer.ElapsedMilliseconds}ms");
Console.WriteLine($" ⮑ {threadsAtStart} threads at start");
Console.WriteLine($" ⮑ {Process.GetCurrentProcess().Threads.Count} threads at end");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment