Skip to content

Instantly share code, notes, and snippets.

@mqudsi
Created April 29, 2020 19:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mqudsi/cafa987fc6a94de07f3dc6c9829e007a to your computer and use it in GitHub Desktop.
Save mqudsi/cafa987fc6a94de07f3dc6c9829e007a to your computer and use it in GitHub Desktop.
StreamSequence, a stream-to-ReadOnlySequenceSegment<byte> wrapper
using System;
using System.IO;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Buffers;
namespace MessageClient
{
class StreamSequence : IDisposable
{
class Segment : ReadOnlySequenceSegment<byte>
{
public int Length => base.Memory.Length;
internal readonly byte[] OwnBuffer;
public Segment(long index, byte[] buffer)
{
OwnBuffer = buffer;
base.Memory = buffer.AsMemory();
base.RunningIndex = index;
}
public Segment(long index, byte[] buffer, int start, int count)
{
OwnBuffer = start == 0 ? buffer : null;
base.Memory = buffer.AsMemory(start, count);
base.RunningIndex = index;
}
internal void SetNext(Segment segment)
{
base.Next = segment;
}
}
const int MinimumSegmentSize = 512;
private readonly bool _ownsStream;
private bool _exhausted = false;
private Stream _stream;
private long _index;
private ArrayPool<byte> _pool;
private (byte[] Buffer, int Cursor) _lastBuff;
private List<Segment> _segments = new List<Segment>();
private Segment _firstSegment;
private Segment _lastSegment;
public bool ClearAfterFree { get; set; } = false;
public StreamSequence(System.IO.Stream stream, bool ownsStream = false)
{
_stream = stream;
_pool = ArrayPool<byte>.Shared;
_ownsStream = ownsStream;
}
public ReadOnlySequence<byte> Sequence
{
get => _segments.Count == 0 ? new ReadOnlySequence<byte>(new byte[0])
: new ReadOnlySequence<byte>(_firstSegment, 0, _lastSegment, _lastSegment.Length - 1);
}
public async Task<bool> ReadMoreAsync()
{
if (_exhausted)
{
return false;
}
if (_lastBuff.Buffer == null || _lastBuff.Cursor == _lastBuff.Buffer.Length)
{
_lastBuff = (_pool.Rent(MinimumSegmentSize), 0);
}
var bytesRead = await _stream.ReadAsync(_lastBuff.Buffer, _lastBuff.Cursor, _lastBuff.Buffer.Length - _lastBuff.Cursor);
if (bytesRead == 0)
{
_exhausted = true;
return false;
}
var segment = new Segment(_index, _lastBuff.Buffer, _lastBuff.Cursor, bytesRead);
if (_segments.Count == 0)
{
_firstSegment = segment;
}
_index += bytesRead;
_segments.Add(segment);
_lastSegment?.SetNext(segment);
_lastSegment = segment;
_lastBuff.Cursor += bytesRead;
return true;
}
public void Dispose()
{
if (_ownsStream)
{
_stream?.Dispose();
_stream = null;
}
foreach (var segment in _segments)
{
if (segment.OwnBuffer != null)
{
_pool.Return(segment.OwnBuffer, ClearAfterFree);
}
}
_segments.Clear();
}
}
}
@MarkoW81
Copy link

MarkoW81 commented Mar 15, 2024

Hello Mahmoud,

thank you for sharing this code. This helped me very much in an implementation of work with a streaming processing json.

One question:

public ReadOnlySequence<byte> Sequence { get => _segments.Count == 0 ? new ReadOnlySequence<byte>(new byte[0]) : new ReadOnlySequence<byte>(_firstSegment, 0, _lastSegment, _lastSegment.Length - 1); }

I think, the _lastSegment.Length - 1 is wrong and must be _lastSegment.Length, because of missing last character.

Best regards, Marko

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment