Skip to content

Instantly share code, notes, and snippets.

@avonwyss
Created December 16, 2022 23:16
Show Gist options
  • Save avonwyss/15081176d900b45ad6409eb991cd1948 to your computer and use it in GitHub Desktop.
Save avonwyss/15081176d900b45ad6409eb991cd1948 to your computer and use it in GitHub Desktop.
Async circular buffer with lookback and load-on-demand
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace bsn.Har.Multipart {
public class CircularBuffer<T>: IAsyncEnumerator<T> {
private readonly Func<T[], int, int, CancellationToken, Task<int>> fillBufferAsync;
private readonly T[] buffer;
private int head;
private int tail = -1;
public CircularBuffer(Func<T[], int, int, CancellationToken, Task<int>> fillBufferAsync, int lookbackCapacity) {
this.fillBufferAsync = fillBufferAsync;
buffer = new T[lookbackCapacity * 2];
}
public async ValueTask<bool> MoveNextAsync() {
if (head <= ++tail) {
var index = head % buffer.Length;
var read = await fillBufferAsync(buffer, index, head == 0 ? buffer.Length : Math.Min(buffer.Length / 2, buffer.Length-index), CancellationToken.None).ConfigureAwait(false);
if (read == 0) {
tail = head-1;
End = true;
return false;
}
head += read;
}
return true;
}
public async ValueTask<bool> MoveAsync(int offset) {
if (offset < 0) {
var index = tail+offset;
if (index < 0 || index > tail || index < head-buffer.Length / 2) {
return false;
}
tail = index;
} else {
while (offset-- > 0) {
if (!await MoveNextAsync().ConfigureAwait(false)) {
return false;
}
}
}
return true;
}
public T Current => buffer[tail % buffer.Length];
public void CopyTo(int index, int count, T[] arr, int arrIndex = 0) {
WriteToAsync(index, count, (source, sourceIndex, sourceCount, cancellationToken) => {
Array.Copy(source, sourceIndex, arr, arrIndex, sourceCount);
arrIndex += sourceCount;
return Task.CompletedTask;
}).ConfigureAwait(false).GetAwaiter().GetResult();
}
public async ValueTask WriteToAsync(int index, int count, Func<T[], int, int, CancellationToken, Task> writeAsync, CancellationToken cancellationToken = default) {
if (index < 0 || index > tail || index < head-buffer.Length / 2) {
throw new ArgumentOutOfRangeException(nameof(index));
}
if (count < 0 || index+count-1 > tail) {
throw new ArgumentOutOfRangeException(nameof(count));
}
index %= buffer.Length;
while (count > 0) {
var copyCount = Math.Min(buffer.Length-index, count);
await writeAsync(buffer, index, copyCount, cancellationToken).ConfigureAwait(false);
index = 0; // same as: index = (index + copyCount) % buffer.Length;
count -= copyCount;
}
}
public IEnumerable<T> Copy(int index, int count) {
if (index < 0 || index > tail || index < head-buffer.Length / 2) {
throw new ArgumentOutOfRangeException(nameof(index));
}
var endIndex = index+count-1;
if (count < 0 || endIndex > tail) {
throw new ArgumentOutOfRangeException(nameof(count));
}
while (index <= endIndex) {
yield return buffer[index++ % buffer.Length];
}
}
public IEnumerable<T> CopyRelative(int offset, int count) {
return Copy(tail+offset, count);
}
public T Relative(int offset) {
var index = tail+offset;
if (index < 0 || index > tail || index < head-buffer.Length / 2) {
throw new ArgumentOutOfRangeException(nameof(offset));
}
return buffer[index];
}
public int Position => tail;
public bool End {
get;
private set;
}
public ValueTask DisposeAsync() {
tail = -1;
return default;
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;
namespace bsn.Har.Multipart {
public class CircularBufferTest {
private readonly ITestOutputHelper output;
public CircularBufferTest(ITestOutputHelper output) {
this.output = output;
}
[Fact]
public async Task BufferTest() {
var datas = new Queue<(int expectedIndex, int expectedCount, char[] content)>();
datas.Enqueue((0, 10, "test".ToCharArray()));
datas.Enqueue((4, 5, "TEST".ToCharArray()));
datas.Enqueue((8, 2, "wr".ToCharArray()));
datas.Enqueue((0, 5, "ap".ToCharArray()));
datas.Enqueue((2, 5, "".ToCharArray()));
var content = datas.SelectMany(t => t.content).ToArray();
var buffer = new CircularBuffer<char>((arr, index, count, token) => {
var data = datas.Dequeue();
output.WriteLine("Buffer Fill: "+new string(data.content));
Assert.Equal(data.expectedIndex, index);
Assert.Equal(data.expectedCount, count);
data.content.CopyTo(arr, index);
return Task.FromResult(data.content.Length);
}, 5);
foreach (var ch in content) {
output.WriteLine("Expect: "+ch);
Assert.True(await buffer.MoveNextAsync().ConfigureAwait(false));
Assert.Equal(ch, buffer.Current);
}
Assert.False(await buffer.MoveNextAsync().ConfigureAwait(false));
Assert.Equal("p", buffer.CopyRelative(0, 1));
Assert.Equal("wrap", buffer.CopyRelative(-3, 4));
Assert.Equal("Twr", buffer.Copy(7, 3));
var temp = new char[5];
buffer.CopyTo(7, 5, temp, 0);
Assert.Equal("Twrap", temp);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment