Skip to content

Instantly share code, notes, and snippets.

@brnls
Last active January 26, 2024 03:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save brnls/1caad57760f1b2ce0622661e5b8fd218 to your computer and use it in GitHub Desktop.
Save brnls/1caad57760f1b2ce0622661e5b8fd218 to your computer and use it in GitHub Desktop.
Time Buffered Channel Reader
public class BufferedChannel<T>
{
    private readonly Channel<T> _input;
    private readonly Channel<IReadOnlyCollection<T>> _output;

    public BufferedChannel(TimeSpan time, int maxItems)
    {
        _input = Channel.CreateBounded<T>(maxItems);
        _output = Channel.CreateBounded<IReadOnlyCollection<T>>(1);
        _ = Process();
    }

    private async ValueTask Process()
    {
        Task readChannel = _input.Reader.WaitToReadAsync().AsTask();
        var list = new List<T>();
        CancellationTokenSource cts = new CancellationTokenSource();
        Task delay = Task.Delay(Timeout.Infinite, cts.Token);
        bool timerSet = false;
        while (true)
        {
            bool saveBatch = false;
            var t = await Task.WhenAny(readChannel, delay!);
            if (t == readChannel)
            {
                while (list.Count < 10 && _input.Reader.TryRead(out var i))
                {
                    list.Add(i);
                }

                if (!timerSet && list.Count > 0)
                {
                    cts.Cancel();
                    cts.Dispose();
                    cts = new CancellationTokenSource();
                    delay = Task.Delay(TimeSpan.FromSeconds(5), cts.Token);
                    timerSet = true;
                }

                if (list.Count == 10)
                {
                    saveBatch = true;
                }

                readChannel = _input.Reader.WaitToReadAsync().AsTask();
            }
            else
            {
                saveBatch = true;
            }

            if (saveBatch)
            {
                try
                {
                    await _output.Writer.WriteAsync(list.ToArray());
                    list.Clear();
                    cts.Cancel();
                    cts.Dispose();
                    cts = new CancellationTokenSource();
                    delay = Task.Delay(Timeout.Infinite, cts.Token);
                    timerSet = false;
                }
                catch (Exception e) { Console.WriteLine(e); }
            }
        }
    }

    public async ValueTask WriteAsync(T t)
    {
        await _input.Writer.WriteAsync(t);
    }

    public IAsyncEnumerable<IReadOnlyCollection<T>> Output()
    {
        return _output.Reader.ReadAllAsync();
    }
}

Usage

var b = new BufferedChannel<int>(TimeSpan.FromSeconds(5), 10);

_ = Task.Run(async () =>
{
    await foreach (var a in b.Output())
    {
        foreach(var i in a)
        {
            Console.WriteLine(i.ToString());
        }
    }
});

while (true)
{
    var kc = Console.ReadKey();
    await b.WriteAsync(kc.KeyChar);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment