Skip to content

Instantly share code, notes, and snippets.

@neuecc
Last active December 22, 2021 10:22
Show Gist options
  • Save neuecc/e6293b1d88244cf942753d6408b3e966 to your computer and use it in GitHub Desktop.
Save neuecc/e6293b1d88244cf942753d6408b3e966 to your computer and use it in GitHub Desktop.
using System.Buffers;
public class LargeArray<T> : IBufferWriter<T>
{
const int MaxArrayLength = 0X7FEFFFFF; // 0x7FFFFFC7;
readonly List<Memory<T>> completedChunks;
readonly long length;
readonly long chunkSize;
T[] currentBuffer;
int currentBufferIndex;
long written;
public long Length => length;
public LargeArray(long length)
: this(length, MaxArrayLength)
{
}
public LargeArray(long length, int chunkSize)
{
this.completedChunks = new List<Memory<T>>((int)(length / chunkSize) + 1);
this.length = length;
this.chunkSize = chunkSize;
this.currentBuffer = new T[Math.Min(chunkSize, length)];
this.currentBufferIndex = 0;
this.written = 0;
}
// for Writer
public void Advance(int count)
{
written += count;
currentBufferIndex += count;
if (currentBuffer.Length < currentBufferIndex) throw new InvalidOperationException($"Advance count:{count} is over than current buffer size.");
}
public Span<T> GetSpan(int sizeHint = 0)
{
ValidateCapacity(sizeHint);
EnsureCapacity(sizeHint);
return new Span<T>(currentBuffer, currentBufferIndex, currentBuffer.Length - currentBufferIndex);
}
public Memory<T> GetMemory(int sizeHint = 0)
{
ValidateCapacity(sizeHint);
EnsureCapacity(sizeHint);
return new Memory<T>(currentBuffer, currentBufferIndex, currentBuffer.Length - currentBufferIndex);
}
public void Reset()
{
completedChunks.Clear();
Array.Clear(currentBuffer);
currentBufferIndex = 0;
written = 0;
}
void ValidateCapacity(int sizeHint)
{
var rest = length - written;
if (rest < sizeHint) throw new InvalidOperationException($"sizeHint:{sizeHint} is capacity:{length} over.");
}
void EnsureCapacity(int sizeHint)
{
if ((currentBuffer.Length == currentBufferIndex) || (currentBuffer.Length - currentBufferIndex < sizeHint))
{
completedChunks.Add(new Memory<T>(currentBuffer, 0, currentBufferIndex));
currentBuffer = new T[Math.Max(sizeHint, Math.Min(chunkSize, length - written))];
currentBufferIndex = 0;
}
}
// for Reader
public ReadOnlySequence<T> AsReadOnlySequence()
{
if (written == 0) return ReadOnlySequence<T>.Empty;
Segment? lastSegment = null;
Segment? nextSegment = null;
if (currentBufferIndex != 0)
{
lastSegment = nextSegment = new Segment(new Memory<T>(currentBuffer, 0, currentBufferIndex), null);
}
for (int i = completedChunks.Count - 1; i >= 0; i--)
{
nextSegment = new Segment(completedChunks[i], nextSegment);
if (lastSegment == null)
{
lastSegment = nextSegment;
}
}
var firstSegment = nextSegment;
var segment = firstSegment;
var index = 0;
while (segment != null)
{
segment.SetRunningIndex(index);
index += segment.Memory.Length;
segment = segment.Next as Segment;
}
return new ReadOnlySequence<T>(nextSegment!, 0, lastSegment!, lastSegment!.Memory.Length);
}
class Segment : ReadOnlySequenceSegment<T>
{
public Segment(Memory<T> buffer, Segment? nextSegment)
{
this.Memory = buffer;
this.Next = nextSegment;
}
internal void SetRunningIndex(long runningIndex)
{
this.RunningIndex = runningIndex;
}
}
}
public static class LargeArrayExtensions
{
public static async Task CopyFromAsync(this LargeArray<byte> buffer, Stream stream, IProgress<int>? progress = null, CancellationToken cancellationToken = default)
{
int read;
while ((read = await stream.ReadAsync(buffer.GetMemory(), cancellationToken).ConfigureAwait(false)) != 0)
{
progress?.Report(read);
buffer.Advance(read);
}
}
public static async Task WriteToFileAsync(this LargeArray<byte> buffer, string path, IProgress<int>? progress = null, CancellationToken cancellationToken = default)
{
using (var fs = new FileStream(path, FileMode.Create, FileAccess.Write, FileShare.ReadWrite, 1, useAsync: true))
{
await WriteToAsync(buffer, fs, progress, cancellationToken);
}
}
public static async Task WriteToAsync(this LargeArray<byte> buffer, Stream stream, IProgress<int>? progress = null, CancellationToken cancellationToken = default)
{
foreach (var item in buffer.AsReadOnlySequence())
{
await stream.WriteAsync(item, cancellationToken).ConfigureAwait(false);
progress?.Report(item.Length);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment