Last active
December 22, 2021 10:22
-
-
Save neuecc/e6293b1d88244cf942753d6408b3e966 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Buffers; | |
public class LargeArray<T> : IBufferWriter<T> | |
{ | |
const int MaxArrayLength = 0X7FEFFFFF; // 0x7FFFFFC7; | |
readonly List<Memory<T>> completedChunks; | |
readonly long length; | |
readonly long chunkSize; | |
T[] currentBuffer; | |
int currentBufferIndex; | |
long written; | |
public long Length => length; | |
public LargeArray(long length) | |
: this(length, MaxArrayLength) | |
{ | |
} | |
public LargeArray(long length, int chunkSize) | |
{ | |
this.completedChunks = new List<Memory<T>>((int)(length / chunkSize) + 1); | |
this.length = length; | |
this.chunkSize = chunkSize; | |
this.currentBuffer = new T[Math.Min(chunkSize, length)]; | |
this.currentBufferIndex = 0; | |
this.written = 0; | |
} | |
// for Writer | |
public void Advance(int count) | |
{ | |
written += count; | |
currentBufferIndex += count; | |
if (currentBuffer.Length < currentBufferIndex) throw new InvalidOperationException($"Advance count:{count} is over than current buffer size."); | |
} | |
public Span<T> GetSpan(int sizeHint = 0) | |
{ | |
ValidateCapacity(sizeHint); | |
EnsureCapacity(sizeHint); | |
return new Span<T>(currentBuffer, currentBufferIndex, currentBuffer.Length - currentBufferIndex); | |
} | |
public Memory<T> GetMemory(int sizeHint = 0) | |
{ | |
ValidateCapacity(sizeHint); | |
EnsureCapacity(sizeHint); | |
return new Memory<T>(currentBuffer, currentBufferIndex, currentBuffer.Length - currentBufferIndex); | |
} | |
public void Reset() | |
{ | |
completedChunks.Clear(); | |
Array.Clear(currentBuffer); | |
currentBufferIndex = 0; | |
written = 0; | |
} | |
void ValidateCapacity(int sizeHint) | |
{ | |
var rest = length - written; | |
if (rest < sizeHint) throw new InvalidOperationException($"sizeHint:{sizeHint} is capacity:{length} over."); | |
} | |
void EnsureCapacity(int sizeHint) | |
{ | |
if ((currentBuffer.Length == currentBufferIndex) || (currentBuffer.Length - currentBufferIndex < sizeHint)) | |
{ | |
completedChunks.Add(new Memory<T>(currentBuffer, 0, currentBufferIndex)); | |
currentBuffer = new T[Math.Max(sizeHint, Math.Min(chunkSize, length - written))]; | |
currentBufferIndex = 0; | |
} | |
} | |
// for Reader | |
public ReadOnlySequence<T> AsReadOnlySequence() | |
{ | |
if (written == 0) return ReadOnlySequence<T>.Empty; | |
Segment? lastSegment = null; | |
Segment? nextSegment = null; | |
if (currentBufferIndex != 0) | |
{ | |
lastSegment = nextSegment = new Segment(new Memory<T>(currentBuffer, 0, currentBufferIndex), null); | |
} | |
for (int i = completedChunks.Count - 1; i >= 0; i--) | |
{ | |
nextSegment = new Segment(completedChunks[i], nextSegment); | |
if (lastSegment == null) | |
{ | |
lastSegment = nextSegment; | |
} | |
} | |
var firstSegment = nextSegment; | |
var segment = firstSegment; | |
var index = 0; | |
while (segment != null) | |
{ | |
segment.SetRunningIndex(index); | |
index += segment.Memory.Length; | |
segment = segment.Next as Segment; | |
} | |
return new ReadOnlySequence<T>(nextSegment!, 0, lastSegment!, lastSegment!.Memory.Length); | |
} | |
class Segment : ReadOnlySequenceSegment<T> | |
{ | |
public Segment(Memory<T> buffer, Segment? nextSegment) | |
{ | |
this.Memory = buffer; | |
this.Next = nextSegment; | |
} | |
internal void SetRunningIndex(long runningIndex) | |
{ | |
this.RunningIndex = runningIndex; | |
} | |
} | |
} | |
public static class LargeArrayExtensions | |
{ | |
public static async Task CopyFromAsync(this LargeArray<byte> buffer, Stream stream, IProgress<int>? progress = null, CancellationToken cancellationToken = default) | |
{ | |
int read; | |
while ((read = await stream.ReadAsync(buffer.GetMemory(), cancellationToken).ConfigureAwait(false)) != 0) | |
{ | |
progress?.Report(read); | |
buffer.Advance(read); | |
} | |
} | |
public static async Task WriteToFileAsync(this LargeArray<byte> buffer, string path, IProgress<int>? progress = null, CancellationToken cancellationToken = default) | |
{ | |
using (var fs = new FileStream(path, FileMode.Create, FileAccess.Write, FileShare.ReadWrite, 1, useAsync: true)) | |
{ | |
await WriteToAsync(buffer, fs, progress, cancellationToken); | |
} | |
} | |
public static async Task WriteToAsync(this LargeArray<byte> buffer, Stream stream, IProgress<int>? progress = null, CancellationToken cancellationToken = default) | |
{ | |
foreach (var item in buffer.AsReadOnlySequence()) | |
{ | |
await stream.WriteAsync(item, cancellationToken).ConfigureAwait(false); | |
progress?.Report(item.Length); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment