Skip to content

Instantly share code, notes, and snippets.

@paulbatum
Last active March 18, 2023 06:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save paulbatum/2037b6d13ab53bc36e2a87eb9ae91772 to your computer and use it in GitHub Desktop.
Save paulbatum/2037b6d13ab53bc36e2a87eb9ae91772 to your computer and use it in GitHub Desktop.
Channel Factories
using System.Diagnostics;
using System.Threading.Channels;
static async Task MinerLoop(ChannelWriter<IronOre> destination, int total, ProductionMetrics metrics)
{
while (total > 0 && await destination.WaitToWriteAsync())
{
while (total > 0 && destination.TryWrite(new IronOre()))
{
total -= 1;
Interlocked.Increment(ref metrics.IronOreProduced);
Debug.WriteLine("Produced 1 iron ore.");
}
}
destination.Complete();
}
static async Task InserterLoop<T>(ChannelReader<T> source, ChannelWriter<T> destination, string iterationMessage)
{
while (await source.WaitToReadAsync() && await destination.WaitToWriteAsync())
{
var read = source.TryRead(out var item);
Debug.Assert(read);
var wrote = destination.TryWrite(item);
Debug.Assert(wrote);
Debug.WriteLine(iterationMessage);
}
destination.Complete();
}
static async Task SmelterLoop(ChannelReader<IronOre> source, ChannelWriter<IronPlate> destination, ProductionMetrics metrics)
{
var readCount = 0;
while (await source.WaitToReadAsync() && await destination.WaitToWriteAsync())
{
while(source.TryRead(out var item))
{
readCount++;
if (readCount == 2)
{
var plate = new IronPlate();
while (destination.TryWrite(plate) == false)
{
await destination.WaitToWriteAsync();
}
Debug.WriteLine("Converted 2 iron ore into 1 iron plate.");
Interlocked.Increment(ref metrics.IronPlateProduced);
readCount = 0;
}
}
}
destination.Complete();
}
static async Task Setup(int total, ProductionMetrics metrics)
{
Channel<IronOre> miner = Channel.CreateBounded<IronOre>(
new BoundedChannelOptions(10)
{
AllowSynchronousContinuations = true,
}
);
Channel<IronOre> smelterInput = Channel.CreateBounded<IronOre>(
new BoundedChannelOptions(2)
{
AllowSynchronousContinuations = true,
}
);
Channel<IronPlate> smelterOutput = Channel.CreateBounded<IronPlate>(
new BoundedChannelOptions(1)
{
AllowSynchronousContinuations = true,
}
);
//Channel<IronPlate> chest = Channel.CreateUnbounded<IronPlate>(
// new UnboundedChannelOptions()
// {
// AllowSynchronousContinuations = true,
// }
//);
Channel<IronPlate> chest = Channel.CreateBounded<IronPlate>(
new BoundedChannelOptions(10)
{
AllowSynchronousContinuations = true,
}
);
List<Task> tasks = new List<Task>();
var minerLoop = MinerLoop(miner.Writer, total, metrics);
tasks.Add(minerLoop);
var ironOreInserterLoop = InserterLoop<IronOre>(miner.Reader, smelterInput.Writer, "Inserted 1 iron ore into smelter");
tasks.Add(ironOreInserterLoop);
var ironSmelterLoop = SmelterLoop(smelterInput.Reader, smelterOutput.Writer, metrics);
tasks.Add(ironSmelterLoop);
var ironPlateInserterLoop = InserterLoop<IronPlate>(smelterOutput.Reader, chest.Writer, "Inserted 1 iron plate into chest");
tasks.Add(ironPlateInserterLoop);
await Task.WhenAll(tasks);
}
var watch = Stopwatch.StartNew();
ProductionMetrics metrics = new ProductionMetrics();
var tasks = new List<Task>();
var total = 40; //;50_000_000;
var parallelSetups = 1;
var perSetup = total / parallelSetups;
for(int i = 0; i < parallelSetups; i++)
{
tasks.Add(Setup(perSetup, metrics));
}
await Task.WhenAll(tasks);
watch.Stop();
//while (true)
//{
// await Task.Delay(TimeSpan.FromSeconds(1));
// Console.WriteLine($"Iron ore mined: {metrics.ironOreProduced}");
// Console.WriteLine($"Iron plates smelted: {metrics.ironPlateProduced}");
// foreach (var t in tasks)
// {
// Console.WriteLine(t.IsCompleted);
// }
// if (tasks.All(t => t.IsCompleted))
// {
// Console.WriteLine("Done.");
// break;
// }
//}
Console.WriteLine($"Iron ore mined: {metrics.IronOreProduced:n0}");
Console.WriteLine($"Iron plates smelted: {metrics.IronPlateProduced:n0}");
Console.WriteLine($"Elapsed: {watch.ElapsedMilliseconds}ms");
//Console.WriteLine("Press any key to continue.");
//Console.ReadKey();
public readonly record struct IronOre;
public readonly record struct IronPlate;
public class ProductionMetrics
{
public int IronOreProduced = 0;
public int IronPlateProduced = 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment