Skip to content

Instantly share code, notes, and snippets.

@mqudsi
Last active May 6, 2024 23:22
Show Gist options
  • Save mqudsi/cafa987fc6a94de07f3dc6c9829e007a to your computer and use it in GitHub Desktop.
Save mqudsi/cafa987fc6a94de07f3dc6c9829e007a to your computer and use it in GitHub Desktop.
StreamSequence, a stream-to-ReadOnlySequenceSegment<byte> wrapper
using System;
using System.IO;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Buffers;
namespace MessageClient
{
class StreamSequence : IDisposable
{
class Segment : ReadOnlySequenceSegment<byte>
{
public int Length => base.Memory.Length;
internal readonly byte[] OwnBuffer;
public Segment(long index, byte[] buffer)
{
OwnBuffer = buffer;
base.Memory = buffer.AsMemory();
base.RunningIndex = index;
}
public Segment(long index, byte[] buffer, int start, int count)
{
OwnBuffer = start == 0 ? buffer : null;
base.Memory = buffer.AsMemory(start, count);
base.RunningIndex = index;
}
internal void SetNext(Segment segment)
{
base.Next = segment;
}
}
const int MinimumSegmentSize = 512;
private readonly bool _ownsStream;
private bool _exhausted = false;
private Stream _stream;
private long _index;
private ArrayPool<byte> _pool;
private (byte[] Buffer, int Cursor) _lastBuff;
private List<Segment> _segments = new List<Segment>();
private Segment _firstSegment;
private Segment _lastSegment;
public bool ClearAfterFree { get; set; } = false;
public StreamSequence(System.IO.Stream stream, bool ownsStream = false)
{
_stream = stream;
_pool = ArrayPool<byte>.Shared;
_ownsStream = ownsStream;
}
public ReadOnlySequence<byte> Sequence
{
get => _segments.Count == 0 ? new ReadOnlySequence<byte>(new byte[0])
: new ReadOnlySequence<byte>(_firstSegment, 0, _lastSegment, _lastSegment.Length);
}
public async Task<bool> ReadMoreAsync()
{
if (_exhausted)
{
return false;
}
if (_lastBuff.Buffer == null || _lastBuff.Cursor == _lastBuff.Buffer.Length)
{
_lastBuff = (_pool.Rent(MinimumSegmentSize), 0);
}
var bytesRead = await _stream.ReadAsync(_lastBuff.Buffer, _lastBuff.Cursor, _lastBuff.Buffer.Length - _lastBuff.Cursor);
if (bytesRead == 0)
{
_exhausted = true;
return false;
}
var segment = new Segment(_index, _lastBuff.Buffer, _lastBuff.Cursor, bytesRead);
if (_segments.Count == 0)
{
_firstSegment = segment;
}
_index += bytesRead;
_segments.Add(segment);
_lastSegment?.SetNext(segment);
_lastSegment = segment;
_lastBuff.Cursor += bytesRead;
return true;
}
public void Dispose()
{
if (_ownsStream)
{
_stream?.Dispose();
_stream = null;
}
foreach (var segment in _segments)
{
if (segment.OwnBuffer != null)
{
_pool.Return(segment.OwnBuffer, ClearAfterFree);
}
}
_segments.Clear();
}
}
}
@MarkoW81
Copy link

MarkoW81 commented Mar 15, 2024

Hello Mahmoud,

thank you for sharing this code. This helped me very much in an implementation of work with a streaming processing json.

One question:

public ReadOnlySequence<byte> Sequence { get => _segments.Count == 0 ? new ReadOnlySequence<byte>(new byte[0]) : new ReadOnlySequence<byte>(_firstSegment, 0, _lastSegment, _lastSegment.Length - 1); }

I think, the _lastSegment.Length - 1 is wrong and must be _lastSegment.Length, because of missing last character.

Best regards, Marko

@mqudsi
Copy link
Author

mqudsi commented May 6, 2024

@MarkoW81 I think you're correct; the terminology used on the MSDN docs for ReadOnlySequence that detail endIndex are vague and refer to it as the "end index" but it's not clear whether that is the last index in the array or the position after the last, but some Googling indicates it is the latter.

I've updated the code sample accordingly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment