Created
December 16, 2022 23:16
-
-
Save avonwyss/15081176d900b45ad6409eb991cd1948 to your computer and use it in GitHub Desktop.
Async circular buffer with lookback and load-on-demand
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace bsn.Har.Multipart { | |
public class CircularBuffer<T>: IAsyncEnumerator<T> { | |
private readonly Func<T[], int, int, CancellationToken, Task<int>> fillBufferAsync; | |
private readonly T[] buffer; | |
private int head; | |
private int tail = -1; | |
public CircularBuffer(Func<T[], int, int, CancellationToken, Task<int>> fillBufferAsync, int lookbackCapacity) { | |
this.fillBufferAsync = fillBufferAsync; | |
buffer = new T[lookbackCapacity * 2]; | |
} | |
public async ValueTask<bool> MoveNextAsync() { | |
if (head <= ++tail) { | |
var index = head % buffer.Length; | |
var read = await fillBufferAsync(buffer, index, head == 0 ? buffer.Length : Math.Min(buffer.Length / 2, buffer.Length-index), CancellationToken.None).ConfigureAwait(false); | |
if (read == 0) { | |
tail = head-1; | |
End = true; | |
return false; | |
} | |
head += read; | |
} | |
return true; | |
} | |
public async ValueTask<bool> MoveAsync(int offset) { | |
if (offset < 0) { | |
var index = tail+offset; | |
if (index < 0 || index > tail || index < head-buffer.Length / 2) { | |
return false; | |
} | |
tail = index; | |
} else { | |
while (offset-- > 0) { | |
if (!await MoveNextAsync().ConfigureAwait(false)) { | |
return false; | |
} | |
} | |
} | |
return true; | |
} | |
public T Current => buffer[tail % buffer.Length]; | |
public void CopyTo(int index, int count, T[] arr, int arrIndex = 0) { | |
WriteToAsync(index, count, (source, sourceIndex, sourceCount, cancellationToken) => { | |
Array.Copy(source, sourceIndex, arr, arrIndex, sourceCount); | |
arrIndex += sourceCount; | |
return Task.CompletedTask; | |
}).ConfigureAwait(false).GetAwaiter().GetResult(); | |
} | |
public async ValueTask WriteToAsync(int index, int count, Func<T[], int, int, CancellationToken, Task> writeAsync, CancellationToken cancellationToken = default) { | |
if (index < 0 || index > tail || index < head-buffer.Length / 2) { | |
throw new ArgumentOutOfRangeException(nameof(index)); | |
} | |
if (count < 0 || index+count-1 > tail) { | |
throw new ArgumentOutOfRangeException(nameof(count)); | |
} | |
index %= buffer.Length; | |
while (count > 0) { | |
var copyCount = Math.Min(buffer.Length-index, count); | |
await writeAsync(buffer, index, copyCount, cancellationToken).ConfigureAwait(false); | |
index = 0; // same as: index = (index + copyCount) % buffer.Length; | |
count -= copyCount; | |
} | |
} | |
public IEnumerable<T> Copy(int index, int count) { | |
if (index < 0 || index > tail || index < head-buffer.Length / 2) { | |
throw new ArgumentOutOfRangeException(nameof(index)); | |
} | |
var endIndex = index+count-1; | |
if (count < 0 || endIndex > tail) { | |
throw new ArgumentOutOfRangeException(nameof(count)); | |
} | |
while (index <= endIndex) { | |
yield return buffer[index++ % buffer.Length]; | |
} | |
} | |
public IEnumerable<T> CopyRelative(int offset, int count) { | |
return Copy(tail+offset, count); | |
} | |
public T Relative(int offset) { | |
var index = tail+offset; | |
if (index < 0 || index > tail || index < head-buffer.Length / 2) { | |
throw new ArgumentOutOfRangeException(nameof(offset)); | |
} | |
return buffer[index]; | |
} | |
public int Position => tail; | |
public bool End { | |
get; | |
private set; | |
} | |
public ValueTask DisposeAsync() { | |
tail = -1; | |
return default; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Threading.Tasks; | |
using Xunit; | |
using Xunit.Abstractions; | |
namespace bsn.Har.Multipart { | |
public class CircularBufferTest { | |
private readonly ITestOutputHelper output; | |
public CircularBufferTest(ITestOutputHelper output) { | |
this.output = output; | |
} | |
[Fact] | |
public async Task BufferTest() { | |
var datas = new Queue<(int expectedIndex, int expectedCount, char[] content)>(); | |
datas.Enqueue((0, 10, "test".ToCharArray())); | |
datas.Enqueue((4, 5, "TEST".ToCharArray())); | |
datas.Enqueue((8, 2, "wr".ToCharArray())); | |
datas.Enqueue((0, 5, "ap".ToCharArray())); | |
datas.Enqueue((2, 5, "".ToCharArray())); | |
var content = datas.SelectMany(t => t.content).ToArray(); | |
var buffer = new CircularBuffer<char>((arr, index, count, token) => { | |
var data = datas.Dequeue(); | |
output.WriteLine("Buffer Fill: "+new string(data.content)); | |
Assert.Equal(data.expectedIndex, index); | |
Assert.Equal(data.expectedCount, count); | |
data.content.CopyTo(arr, index); | |
return Task.FromResult(data.content.Length); | |
}, 5); | |
foreach (var ch in content) { | |
output.WriteLine("Expect: "+ch); | |
Assert.True(await buffer.MoveNextAsync().ConfigureAwait(false)); | |
Assert.Equal(ch, buffer.Current); | |
} | |
Assert.False(await buffer.MoveNextAsync().ConfigureAwait(false)); | |
Assert.Equal("p", buffer.CopyRelative(0, 1)); | |
Assert.Equal("wrap", buffer.CopyRelative(-3, 4)); | |
Assert.Equal("Twr", buffer.Copy(7, 3)); | |
var temp = new char[5]; | |
buffer.CopyTo(7, 5, temp, 0); | |
Assert.Equal("Twrap", temp); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment