Skip to content

Instantly share code, notes, and snippets.

@ronnieoverby
Created April 11, 2019 20:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ronnieoverby/67636c5e519e28644bf21294337e048e to your computer and use it in GitHub Desktop.
Save ronnieoverby/67636c5e519e28644bf21294337e048e to your computer and use it in GitHub Desktop.
class BlockingStreams
{
public Stream WriteableStream { get; }
public Stream ReadableStream { get; }
private readonly CancellationToken _ct;
private readonly BlockingCollection<MemoryStream> _blocks;
public BlockingStreams(int? maxWrites = null, CancellationToken ct = default)
{
_ct = ct;
_blocks = maxWrites.HasValue
? new BlockingCollection<MemoryStream>(maxWrites.Value)
: new BlockingCollection<MemoryStream>();
var readDisposed = new TaskCompletionSource<object>();
var writeDisposed = new TaskCompletionSource<object>();
ReadableStream = new ReadStream(_blocks, ct, readDisposed);
WriteableStream = new WriteStream(_blocks, ct, writeDisposed);
Task.WhenAll(readDisposed.Task, writeDisposed.Task)
.ContinueWith(t => _blocks.Dispose());
}
class ReadStream : Stream
{
readonly TaskCompletionSource<object> _disposed;
readonly BlockingCollection<MemoryStream> _blocks;
readonly CancellationToken _ct;
MemoryStream _current;
long _position;
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => throw new NotSupportedException();
public ReadStream(BlockingCollection<MemoryStream> blocks, CancellationToken ct, TaskCompletionSource<object> disposed)
{
_blocks = blocks;
_ct = ct;
_disposed = disposed;
}
public override long Position
{
get => _position;
set => throw new NotSupportedException();
}
public override void Flush() { }
public override int Read(byte[] buffer, int offset, int count)
{
var read = 0;
while (read < count)
{
if (_current == null)
{
// only wait if no bytes read yet
var timeout = read == 0 ? -1 : 0;
if (!_blocks.TryTake(out _current, timeout, _ct))
return read;
}
var thisRead = _current.Read(buffer, offset + read, count - read);
read += thisRead;
_position += thisRead;
// is current block exhausted?
if (_current.Position == _current.Length)
{
using (_current)
_current = null;
}
}
return read;
}
public override long Seek(long offset, SeekOrigin origin) =>
throw new NotSupportedException();
public override void SetLength(long value) =>
throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count) =>
throw new NotSupportedException();
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
_disposed.SetResult(default);
}
}
class WriteStream : Stream
{
readonly TaskCompletionSource<object> _disposed;
readonly BlockingCollection<MemoryStream> _blocks;
readonly CancellationToken _ct;
readonly RecyclableMemoryStreamManager _streamManager = new RecyclableMemoryStreamManager();
long _position = 0;
public WriteStream(BlockingCollection<MemoryStream> blocks, CancellationToken ct, TaskCompletionSource<object> disposed)
{
_blocks = blocks;
_ct = ct;
_disposed = disposed;
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
_blocks.CompleteAdding();
_disposed.SetResult(default);
}
public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => throw new NotImplementedException();
public override void Write(byte[] buffer, int offset, int count)
{
var stream = _streamManager.GetStream(tag: default, buffer, offset, count);
_position += stream.Length;
_blocks.Add(stream, _ct);
}
public override long Position
{
get => _position;
set => throw new NotSupportedException();
}
public override void Flush() { }
public override int Read(byte[] buffer, int offset, int count) =>
throw new NotSupportedException();
public override long Seek(long offset, SeekOrigin origin) =>
throw new NotSupportedException();
public override void SetLength(long value) =>
throw new NotSupportedException();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment