-
-
Save ruzrobert/f03f80afc1cba0d5affcdd6a413a85dd to your computer and use it in GitHub Desktop.
using System; | |
using System.IO; | |
/// <summary> | |
/// A special type of Lazy MemoryStream, made to connect an output streaming data and a stream consumer. | |
/// Useful when an output can only be streamed to a stream, | |
/// and can not create it's own new stream (for example Ionic Zip Extract). | |
/// | |
/// The data is written lazily - this stream does not store any data. | |
/// The writing is done by directly writing the data to the read buffer, provided by the actual reader. | |
/// The stream is Thread-Safe. | |
/// | |
/// When the data writing is done, the stream creator must call the EndWrite() method to mark the data end. | |
/// | |
/// Sources used for inspiration: | |
/// https://stackoverflow.com/questions/1475747/is-there-an-in-memory-stream-that-blocks-like-a-file-stream | |
/// https://stackoverflow.com/questions/27810289/c-sharp-buffered-zip-stream-proxy | |
/// | |
/// By: @ruzrobert (GitHub) | |
/// </summary> | |
public class ThroughStream : Stream | |
{ | |
public override bool CanRead => true; | |
public override bool CanSeek => false; | |
public override bool CanWrite { get { lock (lockObject) { return isWritable; } } } | |
public override long Length => throw new NotImplementedException(); | |
public override long Position { get => throw new NotImplementedException(); set => throw new NotImplementedException(); } | |
private bool isOpen = true; | |
private bool isWritable = true; | |
private bool hasBufferToReadIn = false; | |
private bool hasLastReadingEnded = true; | |
private bool isWriting = false; | |
private byte[] readBuffer = null; | |
private int readOffset = 0; | |
private int readCount = 0; | |
private object lockObject = new object(); | |
/// <summary> | |
/// Before closing the stream please call the EndWrite method to mark the data end. | |
/// Otherwise the Reader won't be able to read the stream to the end. | |
/// </summary> | |
public override void Close() | |
{ | |
lock (lockObject) | |
{ | |
isOpen = false; | |
EndWriteInternal(); | |
base.Close(); | |
} | |
} | |
/// <summary> | |
/// When the data writing is done, call this method to mark the data end. | |
/// </summary> | |
public void EndWrite() | |
{ | |
lock (lockObject) | |
{ | |
if (!isOpen) throw new InvalidOperationException("The stream is closed"); | |
if (!isWritable) throw new InvalidOperationException("EndWrite can only be called once"); | |
EndWriteInternal(); | |
} | |
} | |
private void EndWriteInternal() | |
{ | |
isWritable = false; | |
hasBufferToReadIn = false; | |
readBuffer = null; | |
} | |
/// <summary> | |
/// Write data to the stream reader. If there is no Read request, this method will wait until there is one. | |
/// </summary> | |
public override void Write(byte[] buffer, int offset, int count) | |
{ | |
if (count == 0) return; | |
if (!isOpen) throw new InvalidOperationException("Writing to a closed stream is not allowed"); | |
lock (lockObject) // Check for simultanous writing | |
{ | |
if (isWriting) throw new InvalidOperationException("Previous Write request is not done yet"); | |
isWriting = true; | |
} | |
// Write loop | |
while (true) // could have been 'count < 0', but we need the iterative lock to sync properly | |
{ | |
lock (lockObject) | |
{ | |
if (!isWritable) throw new InvalidOperationException("The stream is not writable anymore"); | |
if (hasBufferToReadIn) // Write if we have a Read request | |
{ | |
int writeCount = Math.Min(readCount, count); | |
Buffer.BlockCopy(buffer, offset, readBuffer, readOffset, writeCount); | |
offset += writeCount; | |
count -= writeCount; | |
readOffset += writeCount; | |
readCount -= writeCount; | |
if (readCount <= 0) // the Read request is fulfilled | |
{ | |
hasBufferToReadIn = false; | |
readBuffer = null; | |
} | |
if (count <= 0) // nothing else to write | |
{ | |
isWriting = false; | |
break; | |
} | |
} | |
} | |
} | |
} | |
/// <summary> | |
/// Reads data by creating a read request. Returns the read byte count once the writing is done. | |
/// </summary> | |
public override int Read(byte[] buffer, int offset, int count) | |
{ | |
if (buffer == null) throw new ArgumentNullException(nameof(buffer)); | |
if (offset < 0) throw new ArgumentOutOfRangeException(nameof(offset)); | |
if (count < 0) throw new ArgumentOutOfRangeException(nameof(count)); | |
if (buffer.Length - offset < count) throw new ArgumentException("Wrong offset or count"); | |
if (!isOpen) throw new InvalidOperationException("The stream is closed"); | |
if (count == 0) return 0; | |
// Save the buffer we want to read in to | |
lock (lockObject) | |
{ | |
if (!isWritable) return 0; // if there won't be new data anymore, return 0 | |
if (!hasLastReadingEnded) throw new InvalidOperationException("Previous Read request is not done yet"); | |
hasBufferToReadIn = true; | |
hasLastReadingEnded = false; | |
readBuffer = buffer; | |
readOffset = offset; | |
readCount = count; | |
} | |
// Wait till the writing is done | |
while (true) | |
{ | |
lock (lockObject) | |
{ | |
if (!hasBufferToReadIn) | |
{ | |
hasLastReadingEnded = true; | |
return readOffset - offset; | |
} | |
} | |
} | |
} | |
public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException(); | |
public override void SetLength(long value) => throw new NotImplementedException(); | |
public override void Flush() => throw new NotImplementedException(); | |
} |
@VyacheslavPritykin , actually I was thinking about the code this ni\ght, and have come up with an idea, that actually there might be no need for the internal buffers? We could just save the target "buffer" array when someone does the Read, and then the Write function will just copy the data straight to that target buffer. The script will become much simpler then, without unnecessary internal copying.
May I know your use case? Thank you
I use the Google Cloud Storage official library and its API for blob download does not return the stream; instead, it accepts the destination stream, where it writes the blob data.
Note that the download chunk size is configurable in this library. My code looks like this:
public Task<Stream> DownloadAsync(string containerName, string blobName, CancellationToken ct)
{
const int chunkSize = 1024 * 1024 * 5; // 5 MB
var stream = new ThroughStream(bufferSize: chunkSize);
var _ = Task.Run(async () =>
{
using var storageClient = await CreateStorageClient();
try
{
await storageClient.DownloadObjectAsync(containerName,
blobName,
stream,
options: new DownloadObjectOptions { ChunkSize = chunkSize },
cancellationToken: ct);
}
catch (Exception e)
{
_logger.LogError(e,
"Error downloading blob. Container: {ContainerName}, Blob: {BlobName}",
containerName,
blobName);
}
finally
{
stream.EndWrite();
}
},
ct);
return Task.FromResult<Stream>(stream);
}
By the way, I had to add ReadAsync
and WriteAsync
methods into your implementation in order to make ThroughStream
compatible with how it's used by infrastructure code.
By the way, I had to add
ReadAsync
andWriteAsync
methods into your implementation in order to makeThroughStream
compatible with how it's used by infrastructure code.
@VyacheslavPritykin I thought the ReadAsync
and WriteAsync
already have a default implementation in the Stream (base) class? I am not really familiar with the async access to streams, to be honest
@VyacheslavPritykin , I have uploaded the new shorter variant, which isn't using any internal buffers and is writing directly to the read buffer instead. Unfortunately, it seems that the performance is identical. But in this version, I've added more safety for concurrent calls. The code became much simpler, though!
@VyacheslavPritykin , thank you for sharing your use case! My situation is very similar.
I am reading a file via FileStream (File.OpenRead)
, and then using Ionic.Zip.ZipFile.Read(fileStream)
to open the Zip file.
But the zip.Extract
can only write to an existing stream, the same as your DownloadObject
call.
The output stream (ThroughStream
) then goes to a json deserialization, which in the result creates deserialized objects straight from zip-archived json files. :)
The code looks like this:
public static Stream DearchivateFirstEntry(Stream zipStream)
{
ThroughStream outputStream = new ThroughStream();
ZipFile zipFile = ZipFile.Read(zipStream);
ZipEntry firstEntry = zipFile.Entries.FirstOrDefault();
Task task = null;
if (firstEntry != null)
{
task = Task.Factory.StartNew(() =>
{
try
{
firstEntry.Extract(outputStream);
}
catch { }
finally
{
outputStream.EndWrite();
zipFile.Dispose();
zipStream.Dispose();
}
});
}
else
{
zipFile.Dispose();
zipStream.Dispose();
}
return outputStream;
}
@VyacheslavPritykin I thought the ReadAsync and WriteAsync already have a default implementation in the Stream (base) class? I am not really familiar with the async access to streams, to be honest
Yes, they are, but those internal implementations use some shared state that deadlocks the concurrent ReadAsync/WriteAsync calls. So, the easiest way to fix it is:
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken ct)
{
return Task.FromResult(Read(buffer, offset, count));
}
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken ct)
{
Write(buffer, offset, count);
return Task.CompletedTask;
}
Check my tests:
using FluentAssertions;
using ListBrain.Common.Streams;
using Xunit;
using Xunit.Abstractions;
public class ThroughStreamTests
{
private readonly ITestOutputHelper _outputHelper;
private readonly ThroughStream _sut;
public ThroughStreamTests(ITestOutputHelper outputHelper)
{
_outputHelper = outputHelper;
_sut = new ThroughStream();
}
[Theory]
[InlineData(false, 15, 50)]
[InlineData(false, 50, 15)]
[InlineData(false, 15, 15)]
[InlineData(false, null, null)]
[InlineData(true, 15, 50)]
[InlineData(true, 50, 15)]
[InlineData(true, 15, 15)]
[InlineData(true, null, null)]
public void ReadWrite(bool isAsync, int? readCount, int? writeCount)
{
// arrange
const int maxRandomChunkSize = 50;
const int millisecondsDelay = 5;
var randomBytes = GetRandomBytes(10_000);
var resultBytes = new byte[randomBytes.Length];
// act
var readTask = Task.Run(async () =>
{
try
{
var random = new Random();
var offset = 0;
int actualRead;
do
{
var count = Math.Min(resultBytes.Length - offset, readCount ?? random.Next(1, maxRandomChunkSize));
actualRead = isAsync
? await _sut.ReadAsync(resultBytes, offset, count)
: _sut.Read(resultBytes, offset, count);
offset += actualRead;
if (random.NextDouble() > 0.9) await Task.Delay(millisecondsDelay);
} while (actualRead > 0);
}
catch (Exception e)
{
_outputHelper.WriteLine(e.ToString());
throw;
}
});
var writeTask = Task.Run(async () =>
{
try
{
var random = new Random();
var offset = 0;
do
{
var count = Math.Min(randomBytes.Length - offset, writeCount ?? random.Next(1, maxRandomChunkSize));
if (isAsync)
await _sut.WriteAsync(randomBytes, offset, count);
else
_sut.Write(randomBytes, offset, count);
offset += count;
if (random.NextDouble() > 0.9) await Task.Delay(millisecondsDelay);
} while (offset < randomBytes.Length);
_sut.EndWrite();
}
catch (Exception e)
{
_outputHelper.WriteLine(e.ToString());
throw;
}
});
Task.WaitAll(new[] { readTask, writeTask }, TimeSpan.FromSeconds(3));
// assert
readTask.Status.Should().Be(TaskStatus.RanToCompletion);
resultBytes.Should().Equal(randomBytes);
}
private byte[] GetRandomBytes(int length)
{
var bytes = new byte[length];
var random = new Random();
random.NextBytes(bytes);
return bytes;
}
}
I've updated the implementation:
- eliminated crazy CPU consumption on those while loops when a reading process is waiting for the writing one and visa versa
- moved the implementation in the async methods to not lock the threads and prevent thread starvation when the stream is accessed via async read/write methods.
- added a getter for the Position property to track the reading progress
Note that I used async synchronization primitives from the https://github.com/StephenCleary/AsyncEx
#nullable enable
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Nito.AsyncEx;
public sealed class ThroughStream : Stream
{
private readonly AsyncLock _asyncLock = new();
private readonly AsyncManualResetEvent _hasBufferToReadInEvent = new(false);
private readonly AsyncManualResetEvent _bufferWasFilledEvent = new(false);
private bool _hasLastReadingEnded = true;
private bool _isOpen = true;
private bool _isWritable = true;
private bool _isWriting;
private long _position;
private byte[]? _readBuffer;
private int _readCount;
private int _readOffset;
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite
{
get
{
using (_asyncLock.Lock())
{
return _isWritable;
}
}
}
public override long Length => throw new NotImplementedException();
public override long Position
{
get => _position;
set => throw new NotImplementedException();
}
private bool HasBufferToReadIn
{
set
{
if (value)
{
_hasBufferToReadInEvent.Set();
_bufferWasFilledEvent.Reset();
}
else
{
_hasBufferToReadInEvent.Reset();
_bufferWasFilledEvent.Set();
}
}
}
/// <summary>
/// Before closing the stream please call the EndWrite method to mark the data end.
/// Otherwise the Reader won't be able to read the stream to the end.
/// </summary>
public override void Close()
{
using (_asyncLock.Lock())
{
_isOpen = false;
EndWriteInternal();
base.Close();
}
}
/// <summary>
/// When the data writing is done, call this method to mark the data end.
/// </summary>
public void EndWrite()
{
using (_asyncLock.Lock())
{
if (!_isOpen) throw new InvalidOperationException("The stream is closed");
if (!_isWritable) throw new InvalidOperationException("EndWrite can only be called once");
EndWriteInternal();
}
}
private void EndWriteInternal()
{
_isWritable = false;
_readBuffer = null;
HasBufferToReadIn = false;
}
/// <summary>
/// Write data to the stream reader. If there is no Read request, this method will wait until there is one.
/// </summary>
public override void Write(byte[] buffer, int offset, int count)
{
WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
}
/// <summary>
/// Reads data by creating a read request. Returns the read byte count once the writing is done.
/// </summary>
public override int Read(byte[] buffer, int offset, int count)
{
return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
}
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken ct)
{
if (buffer == null) throw new ArgumentNullException(nameof(buffer));
if (offset < 0) throw new ArgumentOutOfRangeException(nameof(offset));
if (count < 0) throw new ArgumentOutOfRangeException(nameof(count));
if (buffer.Length - offset < count) throw new ArgumentException("Wrong offset or count");
if (!_isOpen) throw new InvalidOperationException("The stream is closed");
if (count == 0) return 0;
// Save the buffer we want to read in to
using (await _asyncLock.LockAsync(ct))
{
if (!_isWritable) return 0; // if there won't be new data anymore, return 0
if (!_hasLastReadingEnded) throw new InvalidOperationException("Previous Read request is not done yet");
HasBufferToReadIn = true;
_hasLastReadingEnded = false;
_readBuffer = buffer;
_readOffset = offset;
_readCount = count;
}
// Wait till the writing is done
while (true)
{
await _bufferWasFilledEvent.WaitAsync(ct);
using (await _asyncLock.LockAsync(ct))
{
if (_bufferWasFilledEvent.IsSet)
{
_hasLastReadingEnded = true;
var actualReadCount = _readOffset - offset;
_position += actualReadCount;
return actualReadCount;
}
}
}
}
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken ct)
{
if (count == 0) return;
if (!_isOpen) throw new InvalidOperationException("Writing to a closed stream is not allowed");
using (await _asyncLock.LockAsync(ct)) // Check for simultaneous writing
{
if (_isWriting) throw new InvalidOperationException("Previous Write request is not done yet");
_isWriting = true;
}
// Write loop
while (true) // could have been 'count < 0', but we need the iterative lock to sync properly
{
await _hasBufferToReadInEvent.WaitAsync(ct);
using (await _asyncLock.LockAsync(ct))
{
if (!_isWritable) throw new InvalidOperationException("The stream is not writable anymore");
if (_hasBufferToReadInEvent.IsSet) // Write if we have a Read request
{
var writeCount = Math.Min(_readCount, count);
Buffer.BlockCopy(buffer, offset, _readBuffer!, _readOffset, writeCount);
offset += writeCount;
count -= writeCount;
_readOffset += writeCount;
_readCount -= writeCount;
if (_readCount <= 0) // the Read request is fulfilled
{
HasBufferToReadIn = false;
_readBuffer = null;
}
if (count <= 0) // nothing else to write
{
_isWriting = false;
break;
}
}
}
}
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
public override void SetLength(long value)
{
throw new NotImplementedException();
}
public override void Flush()
{
throw new NotImplementedException();
}
}
I also updated unit tests to cover the case when the reader requests more data than the stream would ever have:
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;
public class ThroughStreamTests
{
private readonly ITestOutputHelper _outputHelper;
private readonly ThroughStream _sut;
public ThroughStreamTests(ITestOutputHelper outputHelper)
{
_outputHelper = outputHelper;
_sut = new ThroughStream();
}
[Theory]
[InlineData(ApiType.Sync, 15, 50)]
[InlineData(ApiType.Sync, 50, 15)]
[InlineData(ApiType.Sync, 15, 15)]
[InlineData(ApiType.Sync, null, null)]
[InlineData(ApiType.Async, 15, 50)]
[InlineData(ApiType.Async, 50, 15)]
[InlineData(ApiType.Async, 15, 15)]
[InlineData(ApiType.Async, null, null)]
[InlineData(ApiType.AsyncMemory, 15, 50)]
[InlineData(ApiType.AsyncMemory, 50, 15)]
[InlineData(ApiType.AsyncMemory, 15, 15)]
[InlineData(ApiType.AsyncMemory, null, null)]
public void ReadWrite(ApiType apiType, int? readCount, int? writeCount)
{
// arrange
const int maxRandomChunkSize = 50;
const int millisecondsDelay = 3;
var randomBytes = GetRandomBytes(10_000);
var extraBuffer = 100;
var resultBytes = new byte[randomBytes.Length + extraBuffer];
// act
var readTask = Task.Run(async () =>
{
try
{
var random = new Random();
var offset = 0;
int actualRead;
do
{
var count = Math.Min(resultBytes.Length - offset, readCount ?? random.Next(1, maxRandomChunkSize));
actualRead = apiType switch
{
ApiType.Sync => _sut.Read(resultBytes, offset, count),
ApiType.Async => await _sut.ReadAsync(resultBytes, offset, count),
ApiType.AsyncMemory => await _sut.ReadAsync(resultBytes.AsMemory(offset, count)),
_ => throw new ArgumentOutOfRangeException(nameof(apiType), apiType, null)
};
offset += actualRead;
if (random.NextDouble() > 0.9) await Task.Delay(millisecondsDelay);
} while (actualRead > 0);
}
catch (Exception e)
{
_outputHelper.WriteLine(e.ToString());
throw;
}
});
var writeTask = Task.Run(async () =>
{
try
{
var random = new Random();
var offset = 0;
do
{
var count = Math.Min(randomBytes.Length - offset, writeCount ?? random.Next(1, maxRandomChunkSize));
if (apiType == ApiType.Sync)
_sut.Write(randomBytes, offset, count);
else if (apiType == ApiType.Async)
await _sut.WriteAsync(randomBytes, offset, count);
else if (apiType == ApiType.AsyncMemory)
await _sut.WriteAsync(randomBytes.AsMemory(offset, count));
else
throw new ArgumentOutOfRangeException(nameof(apiType), apiType, null);
offset += count;
if (random.NextDouble() > 0.9) await Task.Delay(millisecondsDelay);
} while (offset < randomBytes.Length);
_sut.EndWrite();
}
catch (Exception e)
{
_outputHelper.WriteLine(e.ToString());
throw;
}
});
Task.WaitAll(new[] { readTask, writeTask }, TimeSpan.FromSeconds(2));
// assert
readTask.Status.Should().Be(TaskStatus.RanToCompletion);
resultBytes[..^extraBuffer].Should().Equal(randomBytes);
resultBytes[^extraBuffer..].Should().Equal(new byte[extraBuffer]);
_sut.Position.Should().Be(randomBytes.Length);
}
private byte[] GetRandomBytes(int length)
{
var bytes = new byte[length];
var random = new Random();
random.NextBytes(bytes);
return bytes;
}
}
public enum ApiType
{
Sync,
Async,
AsyncMemory
}
Hi @VyacheslavPritykin ! Thank you very much for helping to switch away from the raw locks! The performance is identical (or even better) than reading a whole file to RAM and spiking up to 2 GB, which is really great!
Sadly, that we have to use a separate library for managing the locks, though. What's so special about those async locks? I've checked their internal code, but it seems that they are still using the default locks mechanism under the hood, and I don't know what makes them so fast.
Thank you for the tests as well! I think we could reformat this gist to a separate github repo maybe, this code is much better than in those answers on StackOverflow IMO :)
Hi @VyacheslavPritykin !
Thank you for a different look at my code!
I've fixed the issues you have noticed and did other bugfixes / logic fixes + performance fixes as well.
Also, the stream won't grow up larger than bufferSize * 2 now, even if someone requests to read 1GB of data - the stream will hold only double its buffer size and will continue working only as a transit stream.