Skip to content

Instantly share code, notes, and snippets.

@rogeralsing
Created September 5, 2020 09:02
Show Gist options
  • Save rogeralsing/3f6001462824ae3d74081e783f591466 to your computer and use it in GitHub Desktop.
Save rogeralsing/3f6001462824ae3d74081e783f591466 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace ConsoleApp3
{
class Program
{
static void Main(string[] args)
{
var channelA = Channel.CreateBounded<string>(1000);
var channelB = Channel.CreateBounded<Task>(1000);
_ = ProduceA(channelA.Writer);
_ = ReadA(channelA.Reader,channelB.Writer);
_ = BatchCompleted(channelB.Reader);
Console.ReadLine();
}
private static async Task BatchCompleted(ChannelReader<Task> channelBReader)
{
await Task.Yield();
var buffer = new List<Task>();
while (true)
{
var read = channelBReader.TryRead(out var task);
if (!read)
{
if (buffer.Count > 0)
{
await WaitForBatchBuffer(buffer);
}
}
else
{
buffer.Add(task);
//MAX BATCH COMMIT SIZE
if (buffer.Count == 100)
{
await WaitForBatchBuffer(buffer);
}
}
}
}
private static int messageCount = 0;
private static async Task WaitForBatchBuffer(List<Task> buffer)
{
messageCount += buffer.Count;
Console.WriteLine("Waiting for batch buffer");
Console.WriteLine("Total messages handled " + messageCount);
//We could commit the read offset here.
//(We would ofc need more than just the tasks, some info on the message batch itself)
await Task.WhenAll(buffer);
buffer.Clear();
}
//CONSUMER:
//this is basically what we have right now in Gjöll
private static async Task ReadA(ChannelReader<string> channelAReader, ChannelWriter<Task> channelBWriter)
{
await Task.Yield();
while (true)
{
var message = await channelAReader.ReadAsync();
var task = HandleMessage(message);
await channelBWriter.WriteAsync(task);
}
}
//PROCESSOR:
//this is basically what we have right now in Gjöll
private static async Task HandleMessage(string message)
{
//This would be the Processor message handler
//simulate slow handler
await Task.Delay(1000);
}
//PRODUCER:
//this is basically what we have right now in Gjöll
private static async Task ProduceA(ChannelWriter<string> channelWriter)
{
await Task.Yield();
while (true)
{
//this would be the Kafka reader
await channelWriter.WriteAsync(DateTime.Now.ToString());
}
}
}
}
@rogeralsing
Copy link
Author

@rogeralsing
Copy link
Author

https://gist.github.com/rogeralsing/3f6001462824ae3d74081e783f591466#file-foo-cs-L73

The code calling the message handler, does not await the returning task, instead forwards it to a channel

@rogeralsing
Copy link
Author

https://gist.github.com/rogeralsing/3f6001462824ae3d74081e783f591466#file-foo-cs-L60

Once this task channel reaches a certain point, we wait for those tasks to complete.

freeing up space in the batch channel, allowing the entire pipeline to push more tasks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment