Last active
May 6, 2024 23:22
-
-
Save mqudsi/cafa987fc6a94de07f3dc6c9829e007a to your computer and use it in GitHub Desktop.
StreamSequence, a stream-to-ReadOnlySequenceSegment<byte> wrapper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.IO; | |
using System.Collections.Generic; | |
using System.Threading.Tasks; | |
using System.Buffers; | |
namespace MessageClient | |
{ | |
class StreamSequence : IDisposable | |
{ | |
class Segment : ReadOnlySequenceSegment<byte> | |
{ | |
public int Length => base.Memory.Length; | |
internal readonly byte[] OwnBuffer; | |
public Segment(long index, byte[] buffer) | |
{ | |
OwnBuffer = buffer; | |
base.Memory = buffer.AsMemory(); | |
base.RunningIndex = index; | |
} | |
public Segment(long index, byte[] buffer, int start, int count) | |
{ | |
OwnBuffer = start == 0 ? buffer : null; | |
base.Memory = buffer.AsMemory(start, count); | |
base.RunningIndex = index; | |
} | |
internal void SetNext(Segment segment) | |
{ | |
base.Next = segment; | |
} | |
} | |
const int MinimumSegmentSize = 512; | |
private readonly bool _ownsStream; | |
private bool _exhausted = false; | |
private Stream _stream; | |
private long _index; | |
private ArrayPool<byte> _pool; | |
private (byte[] Buffer, int Cursor) _lastBuff; | |
private List<Segment> _segments = new List<Segment>(); | |
private Segment _firstSegment; | |
private Segment _lastSegment; | |
public bool ClearAfterFree { get; set; } = false; | |
public StreamSequence(System.IO.Stream stream, bool ownsStream = false) | |
{ | |
_stream = stream; | |
_pool = ArrayPool<byte>.Shared; | |
_ownsStream = ownsStream; | |
} | |
public ReadOnlySequence<byte> Sequence | |
{ | |
get => _segments.Count == 0 ? new ReadOnlySequence<byte>(new byte[0]) | |
: new ReadOnlySequence<byte>(_firstSegment, 0, _lastSegment, _lastSegment.Length); | |
} | |
public async Task<bool> ReadMoreAsync() | |
{ | |
if (_exhausted) | |
{ | |
return false; | |
} | |
if (_lastBuff.Buffer == null || _lastBuff.Cursor == _lastBuff.Buffer.Length) | |
{ | |
_lastBuff = (_pool.Rent(MinimumSegmentSize), 0); | |
} | |
var bytesRead = await _stream.ReadAsync(_lastBuff.Buffer, _lastBuff.Cursor, _lastBuff.Buffer.Length - _lastBuff.Cursor); | |
if (bytesRead == 0) | |
{ | |
_exhausted = true; | |
return false; | |
} | |
var segment = new Segment(_index, _lastBuff.Buffer, _lastBuff.Cursor, bytesRead); | |
if (_segments.Count == 0) | |
{ | |
_firstSegment = segment; | |
} | |
_index += bytesRead; | |
_segments.Add(segment); | |
_lastSegment?.SetNext(segment); | |
_lastSegment = segment; | |
_lastBuff.Cursor += bytesRead; | |
return true; | |
} | |
public void Dispose() | |
{ | |
if (_ownsStream) | |
{ | |
_stream?.Dispose(); | |
_stream = null; | |
} | |
foreach (var segment in _segments) | |
{ | |
if (segment.OwnBuffer != null) | |
{ | |
_pool.Return(segment.OwnBuffer, ClearAfterFree); | |
} | |
} | |
_segments.Clear(); | |
} | |
} | |
} |
@MarkoW81 I think you're correct; the terminology used on the MSDN docs for ReadOnlySequence
that detail endIndex
are vague and refer to it as the "end index" but it's not clear whether that is the last index in the array or the position after the last, but some Googling indicates it is the latter.
I've updated the code sample accordingly.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hello Mahmoud,
thank you for sharing this code. This helped me very much in an implementation of work with a streaming processing json.
One question:
public ReadOnlySequence<byte> Sequence { get => _segments.Count == 0 ? new ReadOnlySequence<byte>(new byte[0]) : new ReadOnlySequence<byte>(_firstSegment, 0, _lastSegment, _lastSegment.Length - 1); }
I think, the
_lastSegment.Length - 1
is wrong and must be_lastSegment.Length
, because of missing last character.Best regards, Marko