public class BufferedChannel<T>
{
private readonly Channel<T> _input;
private readonly Channel<IReadOnlyCollection<T>> _output;
public BufferedChannel(TimeSpan time, int maxItems)
{
_input = Channel.CreateBounded<T>(maxItems);
_output = Channel.CreateBounded<IReadOnlyCollection<T>>(1);
_ = Process();
}
private async ValueTask Process()
{
Task readChannel = _input.Reader.WaitToReadAsync().AsTask();
var list = new List<T>();
CancellationTokenSource cts = new CancellationTokenSource();
Task delay = Task.Delay(Timeout.Infinite, cts.Token);
bool timerSet = false;
while (true)
{
bool saveBatch = false;
var t = await Task.WhenAny(readChannel, delay!);
if (t == readChannel)
{
while (list.Count < 10 && _input.Reader.TryRead(out var i))
{
list.Add(i);
}
if (!timerSet && list.Count > 0)
{
cts.Cancel();
cts.Dispose();
cts = new CancellationTokenSource();
delay = Task.Delay(TimeSpan.FromSeconds(5), cts.Token);
timerSet = true;
}
if (list.Count == 10)
{
saveBatch = true;
}
readChannel = _input.Reader.WaitToReadAsync().AsTask();
}
else
{
saveBatch = true;
}
if (saveBatch)
{
try
{
await _output.Writer.WriteAsync(list.ToArray());
list.Clear();
cts.Cancel();
cts.Dispose();
cts = new CancellationTokenSource();
delay = Task.Delay(Timeout.Infinite, cts.Token);
timerSet = false;
}
catch (Exception e) { Console.WriteLine(e); }
}
}
}
public async ValueTask WriteAsync(T t)
{
await _input.Writer.WriteAsync(t);
}
public IAsyncEnumerable<IReadOnlyCollection<T>> Output()
{
return _output.Reader.ReadAllAsync();
}
}
var b = new BufferedChannel<int>(TimeSpan.FromSeconds(5), 10);
_ = Task.Run(async () =>
{
await foreach (var a in b.Output())
{
foreach(var i in a)
{
Console.WriteLine(i.ToString());
}
}
});
while (true)
{
var kc = Console.ReadKey();
await b.WriteAsync(kc.KeyChar);
}