Skip to content

Instantly share code, notes, and snippets.

@ruzrobert
Last active January 10, 2024 16:33
Show Gist options
  • Save ruzrobert/f03f80afc1cba0d5affcdd6a413a85dd to your computer and use it in GitHub Desktop.
Save ruzrobert/f03f80afc1cba0d5affcdd6a413a85dd to your computer and use it in GitHub Desktop.
A special type of Lazy MemoryStream, made to connect an output streaming data and a stream consumer.
using System;
using System.IO;
/// <summary>
/// A special type of Lazy MemoryStream, made to connect an output streaming data and a stream consumer.
/// Useful when an output can only be streamed to a stream,
/// and can not create it's own new stream (for example Ionic Zip Extract).
///
/// The data is written lazily - this stream does not store any data.
/// The writing is done by directly writing the data to the read buffer, provided by the actual reader.
/// The stream is Thread-Safe.
///
/// When the data writing is done, the stream creator must call the EndWrite() method to mark the data end.
///
/// Sources used for inspiration:
/// https://stackoverflow.com/questions/1475747/is-there-an-in-memory-stream-that-blocks-like-a-file-stream
/// https://stackoverflow.com/questions/27810289/c-sharp-buffered-zip-stream-proxy
///
/// By: @ruzrobert (GitHub)
/// </summary>
public class ThroughStream : Stream
{
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite { get { lock (lockObject) { return isWritable; } } }
public override long Length => throw new NotImplementedException();
public override long Position { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
private bool isOpen = true;
private bool isWritable = true;
private bool hasBufferToReadIn = false;
private bool hasLastReadingEnded = true;
private bool isWriting = false;
private byte[] readBuffer = null;
private int readOffset = 0;
private int readCount = 0;
private object lockObject = new object();
/// <summary>
/// Before closing the stream please call the EndWrite method to mark the data end.
/// Otherwise the Reader won't be able to read the stream to the end.
/// </summary>
public override void Close()
{
lock (lockObject)
{
isOpen = false;
EndWriteInternal();
base.Close();
}
}
/// <summary>
/// When the data writing is done, call this method to mark the data end.
/// </summary>
public void EndWrite()
{
lock (lockObject)
{
if (!isOpen) throw new InvalidOperationException("The stream is closed");
if (!isWritable) throw new InvalidOperationException("EndWrite can only be called once");
EndWriteInternal();
}
}
private void EndWriteInternal()
{
isWritable = false;
hasBufferToReadIn = false;
readBuffer = null;
}
/// <summary>
/// Write data to the stream reader. If there is no Read request, this method will wait until there is one.
/// </summary>
public override void Write(byte[] buffer, int offset, int count)
{
if (count == 0) return;
if (!isOpen) throw new InvalidOperationException("Writing to a closed stream is not allowed");
lock (lockObject) // Check for simultanous writing
{
if (isWriting) throw new InvalidOperationException("Previous Write request is not done yet");
isWriting = true;
}
// Write loop
while (true) // could have been 'count < 0', but we need the iterative lock to sync properly
{
lock (lockObject)
{
if (!isWritable) throw new InvalidOperationException("The stream is not writable anymore");
if (hasBufferToReadIn) // Write if we have a Read request
{
int writeCount = Math.Min(readCount, count);
Buffer.BlockCopy(buffer, offset, readBuffer, readOffset, writeCount);
offset += writeCount;
count -= writeCount;
readOffset += writeCount;
readCount -= writeCount;
if (readCount <= 0) // the Read request is fulfilled
{
hasBufferToReadIn = false;
readBuffer = null;
}
if (count <= 0) // nothing else to write
{
isWriting = false;
break;
}
}
}
}
}
/// <summary>
/// Reads data by creating a read request. Returns the read byte count once the writing is done.
/// </summary>
public override int Read(byte[] buffer, int offset, int count)
{
if (buffer == null) throw new ArgumentNullException(nameof(buffer));
if (offset < 0) throw new ArgumentOutOfRangeException(nameof(offset));
if (count < 0) throw new ArgumentOutOfRangeException(nameof(count));
if (buffer.Length - offset < count) throw new ArgumentException("Wrong offset or count");
if (!isOpen) throw new InvalidOperationException("The stream is closed");
if (count == 0) return 0;
// Save the buffer we want to read in to
lock (lockObject)
{
if (!isWritable) return 0; // if there won't be new data anymore, return 0
if (!hasLastReadingEnded) throw new InvalidOperationException("Previous Read request is not done yet");
hasBufferToReadIn = true;
hasLastReadingEnded = false;
readBuffer = buffer;
readOffset = offset;
readCount = count;
}
// Wait till the writing is done
while (true)
{
lock (lockObject)
{
if (!hasBufferToReadIn)
{
hasLastReadingEnded = true;
return readOffset - offset;
}
}
}
}
public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException();
public override void SetLength(long value) => throw new NotImplementedException();
public override void Flush() => throw new NotImplementedException();
}
@ruzrobert
Copy link
Author

By the way, I had to add ReadAsync and WriteAsync methods into your implementation in order to make ThroughStream compatible with how it's used by infrastructure code.

@VyacheslavPritykin I thought the ReadAsync and WriteAsync already have a default implementation in the Stream (base) class? I am not really familiar with the async access to streams, to be honest

@ruzrobert
Copy link
Author

@VyacheslavPritykin , I have uploaded the new shorter variant, which isn't using any internal buffers and is writing directly to the read buffer instead. Unfortunately, it seems that the performance is identical. But in this version, I've added more safety for concurrent calls. The code became much simpler, though!

@ruzrobert
Copy link
Author

ruzrobert commented Oct 23, 2022

@VyacheslavPritykin , thank you for sharing your use case! My situation is very similar.
I am reading a file via FileStream (File.OpenRead), and then using Ionic.Zip.ZipFile.Read(fileStream) to open the Zip file.
But the zip.Extract can only write to an existing stream, the same as your DownloadObject call.

The output stream (ThroughStream) then goes to a json deserialization, which in the result creates deserialized objects straight from zip-archived json files. :)

The code looks like this:

public static Stream DearchivateFirstEntry(Stream zipStream)
{
	ThroughStream outputStream = new ThroughStream();
	ZipFile zipFile = ZipFile.Read(zipStream);

	ZipEntry firstEntry = zipFile.Entries.FirstOrDefault();
	Task task = null;
	if (firstEntry != null)
	{
		task = Task.Factory.StartNew(() =>
		{
			try
			{
				firstEntry.Extract(outputStream);
			}
			catch { }
			finally
			{
				outputStream.EndWrite();
				zipFile.Dispose();
				zipStream.Dispose();
			}
		});
	}
	else
	{
		zipFile.Dispose();
		zipStream.Dispose();
	}

	return outputStream;
}

@VyacheslavPritykin
Copy link

@VyacheslavPritykin I thought the ReadAsync and WriteAsync already have a default implementation in the Stream (base) class? I am not really familiar with the async access to streams, to be honest

Yes, they are, but those internal implementations use some shared state that deadlocks the concurrent ReadAsync/WriteAsync calls. So, the easiest way to fix it is:

public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken ct)
{
    return Task.FromResult(Read(buffer, offset, count));
}

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken ct)
{
    Write(buffer, offset, count);
    return Task.CompletedTask;
}

Check my tests:

using FluentAssertions;
using ListBrain.Common.Streams;
using Xunit;
using Xunit.Abstractions;

public class ThroughStreamTests
{
    private readonly ITestOutputHelper _outputHelper;
    private readonly ThroughStream _sut;

    public ThroughStreamTests(ITestOutputHelper outputHelper)
    {
        _outputHelper = outputHelper;
        _sut = new ThroughStream();
    }

    [Theory]
    [InlineData(false, 15, 50)]
    [InlineData(false, 50, 15)]
    [InlineData(false, 15, 15)]
    [InlineData(false, null, null)]
    [InlineData(true, 15, 50)]
    [InlineData(true, 50, 15)]
    [InlineData(true, 15, 15)]
    [InlineData(true, null, null)]
    public void ReadWrite(bool isAsync, int? readCount, int? writeCount)
    {
        // arrange
        const int maxRandomChunkSize = 50;
        const int millisecondsDelay = 5;
        var randomBytes = GetRandomBytes(10_000);
        var resultBytes = new byte[randomBytes.Length];

        // act
        var readTask = Task.Run(async () =>
        {
            try
            {
                var random = new Random();
                var offset = 0;
                int actualRead;
                do
                {
                    var count = Math.Min(resultBytes.Length - offset, readCount ?? random.Next(1, maxRandomChunkSize));
                    actualRead = isAsync
                        ? await _sut.ReadAsync(resultBytes, offset, count)
                        : _sut.Read(resultBytes, offset, count);

                    offset += actualRead;

                    if (random.NextDouble() > 0.9) await Task.Delay(millisecondsDelay);
                } while (actualRead > 0);
            }
            catch (Exception e)
            {
                _outputHelper.WriteLine(e.ToString());
                throw;
            }
        });

        var writeTask = Task.Run(async () =>
        {
            try
            {
                var random = new Random();
                var offset = 0;
                do
                {
                    var count = Math.Min(randomBytes.Length - offset, writeCount ?? random.Next(1, maxRandomChunkSize));
                    if (isAsync)
                        await _sut.WriteAsync(randomBytes, offset, count);
                    else
                        _sut.Write(randomBytes, offset, count);

                    offset += count;
                    if (random.NextDouble() > 0.9) await Task.Delay(millisecondsDelay);
                } while (offset < randomBytes.Length);

                _sut.EndWrite();
            }
            catch (Exception e)
            {
                _outputHelper.WriteLine(e.ToString());
                throw;
            }
        });

        Task.WaitAll(new[] { readTask, writeTask }, TimeSpan.FromSeconds(3));

        // assert
        readTask.Status.Should().Be(TaskStatus.RanToCompletion);
        resultBytes.Should().Equal(randomBytes);
    }
    
    private byte[] GetRandomBytes(int length)
    {
        var bytes = new byte[length];
        var random = new Random();
        random.NextBytes(bytes);
        return bytes;
    }
}

@VyacheslavPritykin
Copy link

VyacheslavPritykin commented Oct 25, 2022

I've updated the implementation:

  • eliminated crazy CPU consumption on those while loops when a reading process is waiting for the writing one and visa versa
  • moved the implementation in the async methods to not lock the threads and prevent thread starvation when the stream is accessed via async read/write methods.
  • added a getter for the Position property to track the reading progress

Note that I used async synchronization primitives from the https://github.com/StephenCleary/AsyncEx

#nullable enable

using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Nito.AsyncEx;

public sealed class ThroughStream : Stream
{
    private readonly AsyncLock _asyncLock = new();
    private readonly AsyncManualResetEvent _hasBufferToReadInEvent = new(false);
    private readonly AsyncManualResetEvent _bufferWasFilledEvent = new(false);
    private bool _hasLastReadingEnded = true;

    private bool _isOpen = true;
    private bool _isWritable = true;
    private bool _isWriting;

    private long _position;
    private byte[]? _readBuffer;
    private int _readCount;
    private int _readOffset;
    public override bool CanRead => true;
    public override bool CanSeek => false;

    public override bool CanWrite
    {
        get
        {
            using (_asyncLock.Lock())
            {
                return _isWritable;
            }
        }
    }

    public override long Length => throw new NotImplementedException();

    public override long Position
    {
        get => _position;
        set => throw new NotImplementedException();
    }

    private bool HasBufferToReadIn
    {
        set
        {
            if (value)
            {
                _hasBufferToReadInEvent.Set();
                _bufferWasFilledEvent.Reset();
            }
            else
            {
                _hasBufferToReadInEvent.Reset();
                _bufferWasFilledEvent.Set();
            }
        }
    }

    /// <summary>
    ///     Before closing the stream please call the EndWrite method to mark the data end.
    ///     Otherwise the Reader won't be able to read the stream to the end.
    /// </summary>
    public override void Close()
    {
        using (_asyncLock.Lock())
        {
            _isOpen = false;
            EndWriteInternal();

            base.Close();
        }
    }

    /// <summary>
    ///     When the data writing is done, call this method to mark the data end.
    /// </summary>
    public void EndWrite()
    {
        using (_asyncLock.Lock())
        {
            if (!_isOpen) throw new InvalidOperationException("The stream is closed");
            if (!_isWritable) throw new InvalidOperationException("EndWrite can only be called once");

            EndWriteInternal();
        }
    }

    private void EndWriteInternal()
    {
        _isWritable = false;
        _readBuffer = null;
        HasBufferToReadIn = false;
    }

    /// <summary>
    ///     Write data to the stream reader. If there is no Read request, this method will wait until there is one.
    /// </summary>
    public override void Write(byte[] buffer, int offset, int count)
    {
        WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
    }

    /// <summary>
    ///     Reads data by creating a read request. Returns the read byte count once the writing is done.
    /// </summary>
    public override int Read(byte[] buffer, int offset, int count)
    {
        return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
    }

    public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken ct)
    {
        if (buffer == null) throw new ArgumentNullException(nameof(buffer));
        if (offset < 0) throw new ArgumentOutOfRangeException(nameof(offset));
        if (count < 0) throw new ArgumentOutOfRangeException(nameof(count));
        if (buffer.Length - offset < count) throw new ArgumentException("Wrong offset or count");
        if (!_isOpen) throw new InvalidOperationException("The stream is closed");

        if (count == 0) return 0;

        // Save the buffer we want to read in to
        using (await _asyncLock.LockAsync(ct))
        {
            if (!_isWritable) return 0; // if there won't be new data anymore, return 0
            if (!_hasLastReadingEnded) throw new InvalidOperationException("Previous Read request is not done yet");

            HasBufferToReadIn = true;
            _hasLastReadingEnded = false;

            _readBuffer = buffer;
            _readOffset = offset;
            _readCount = count;
        }

        // Wait till the writing is done
        while (true)
        {
            await _bufferWasFilledEvent.WaitAsync(ct);

            using (await _asyncLock.LockAsync(ct))
            {
                if (_bufferWasFilledEvent.IsSet)
                {
                    _hasLastReadingEnded = true;
                    var actualReadCount = _readOffset - offset;
                    _position += actualReadCount;
                    return actualReadCount;
                }
            }
        }
    }

    public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken ct)
    {
        if (count == 0) return;
        if (!_isOpen) throw new InvalidOperationException("Writing to a closed stream is not allowed");

        using (await _asyncLock.LockAsync(ct)) // Check for simultaneous writing
        {
            if (_isWriting) throw new InvalidOperationException("Previous Write request is not done yet");
            _isWriting = true;
        }

        // Write loop
        while (true) // could have been 'count < 0', but we need the iterative lock to sync properly
        {
            await _hasBufferToReadInEvent.WaitAsync(ct);
            using (await _asyncLock.LockAsync(ct))
            {
                if (!_isWritable) throw new InvalidOperationException("The stream is not writable anymore");

                if (_hasBufferToReadInEvent.IsSet) // Write if we have a Read request
                {
                    var writeCount = Math.Min(_readCount, count);
                    Buffer.BlockCopy(buffer, offset, _readBuffer!, _readOffset, writeCount);

                    offset += writeCount;
                    count -= writeCount;
                    _readOffset += writeCount;
                    _readCount -= writeCount;

                    if (_readCount <= 0) // the Read request is fulfilled
                    {
                        HasBufferToReadIn = false;
                        _readBuffer = null;
                    }

                    if (count <= 0) // nothing else to write
                    {
                        _isWriting = false;
                        break;
                    }
                }
            }
        }
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        throw new NotImplementedException();
    }

    public override void SetLength(long value)
    {
        throw new NotImplementedException();
    }

    public override void Flush()
    {
        throw new NotImplementedException();
    }
}

@VyacheslavPritykin
Copy link

VyacheslavPritykin commented Oct 25, 2022

I also updated unit tests to cover the case when the reader requests more data than the stream would ever have:

using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

public class ThroughStreamTests
{
    private readonly ITestOutputHelper _outputHelper;
    private readonly ThroughStream _sut;

    public ThroughStreamTests(ITestOutputHelper outputHelper)
    {
        _outputHelper = outputHelper;
        _sut = new ThroughStream();
    }
    
    [Theory]
    [InlineData(ApiType.Sync, 15, 50)]
    [InlineData(ApiType.Sync, 50, 15)]
    [InlineData(ApiType.Sync, 15, 15)]
    [InlineData(ApiType.Sync, null, null)]
    [InlineData(ApiType.Async, 15, 50)]
    [InlineData(ApiType.Async, 50, 15)]
    [InlineData(ApiType.Async, 15, 15)]
    [InlineData(ApiType.Async, null, null)]
    [InlineData(ApiType.AsyncMemory, 15, 50)]
    [InlineData(ApiType.AsyncMemory, 50, 15)]
    [InlineData(ApiType.AsyncMemory, 15, 15)]
    [InlineData(ApiType.AsyncMemory, null, null)]
    public void ReadWrite(ApiType apiType, int? readCount, int? writeCount)
    {
        // arrange
        const int maxRandomChunkSize = 50;
        const int millisecondsDelay = 3;
        var randomBytes = GetRandomBytes(10_000);
        var extraBuffer = 100;
        var resultBytes = new byte[randomBytes.Length + extraBuffer];

        // act
        var readTask = Task.Run(async () =>
        {
            try
            {
                var random = new Random();
                var offset = 0;
                int actualRead;
                do
                {
                    var count = Math.Min(resultBytes.Length - offset, readCount ?? random.Next(1, maxRandomChunkSize));
                    actualRead = apiType switch
                    {
                        ApiType.Sync => _sut.Read(resultBytes, offset, count),
                        ApiType.Async => await _sut.ReadAsync(resultBytes, offset, count),
                        ApiType.AsyncMemory => await _sut.ReadAsync(resultBytes.AsMemory(offset, count)),
                        _ => throw new ArgumentOutOfRangeException(nameof(apiType), apiType, null)
                    };

                    offset += actualRead;

                    if (random.NextDouble() > 0.9) await Task.Delay(millisecondsDelay);
                } while (actualRead > 0);
            }
            catch (Exception e)
            {
                _outputHelper.WriteLine(e.ToString());
                throw;
            }
        });

        var writeTask = Task.Run(async () =>
        {
            try
            {
                var random = new Random();
                var offset = 0;
                do
                {
                    var count = Math.Min(randomBytes.Length - offset, writeCount ?? random.Next(1, maxRandomChunkSize));
                    if (apiType == ApiType.Sync)
                        _sut.Write(randomBytes, offset, count);
                    else if (apiType == ApiType.Async)
                        await _sut.WriteAsync(randomBytes, offset, count);
                    else if (apiType == ApiType.AsyncMemory)
                        await _sut.WriteAsync(randomBytes.AsMemory(offset, count));
                    else
                        throw new ArgumentOutOfRangeException(nameof(apiType), apiType, null);

                    offset += count;
                    if (random.NextDouble() > 0.9) await Task.Delay(millisecondsDelay);
                } while (offset < randomBytes.Length);

                _sut.EndWrite();
            }
            catch (Exception e)
            {
                _outputHelper.WriteLine(e.ToString());
                throw;
            }
        });

        Task.WaitAll(new[] { readTask, writeTask }, TimeSpan.FromSeconds(2));

        // assert
        readTask.Status.Should().Be(TaskStatus.RanToCompletion);
        resultBytes[..^extraBuffer].Should().Equal(randomBytes);
        resultBytes[^extraBuffer..].Should().Equal(new byte[extraBuffer]);
        _sut.Position.Should().Be(randomBytes.Length);
    }


    private byte[] GetRandomBytes(int length)
    {
        var bytes = new byte[length];
        var random = new Random();
        random.NextBytes(bytes);
        return bytes;
    }
}

public enum ApiType
{
    Sync,
    Async,
    AsyncMemory
}

@ruzrobert
Copy link
Author

Hi @VyacheslavPritykin ! Thank you very much for helping to switch away from the raw locks! The performance is identical (or even better) than reading a whole file to RAM and spiking up to 2 GB, which is really great!

Sadly, that we have to use a separate library for managing the locks, though. What's so special about those async locks? I've checked their internal code, but it seems that they are still using the default locks mechanism under the hood, and I don't know what makes them so fast.

Thank you for the tests as well! I think we could reformat this gist to a separate github repo maybe, this code is much better than in those answers on StackOverflow IMO :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment