Skip to content

Instantly share code, notes, and snippets.

@stevehayles
Last active October 13, 2018 19:31
Show Gist options
  • Save stevehayles/87b338607abce2af196bd044378c5316 to your computer and use it in GitHub Desktop.
Save stevehayles/87b338607abce2af196bd044378c5316 to your computer and use it in GitHub Desktop.
System.IO.Pipelines implementation to show internal buffering within a stream. Accepts read and write calls separately
public sealed class PipeStream : Stream
{
private SemaphoreSlim _lock;
private PipeReader _reader;
private readonly PipeWriter _writer;
public PipeStream()
{
_lock = new SemaphoreSlim(1, 1);
var pipe = new Pipe();
_reader = pipe.Reader;
_writer = pipe.Writer;
}
public override bool CanRead => true;
public override bool CanWrite => true;
public override bool CanSeek => false;
public override long Length => throw new NotSupportedException();
public override long Position
{
get { throw new NotSupportedException(); }
set { throw new NotSupportedException(); }
}
public override void Flush() { }
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override int Read(byte[] buffer, int offset, int count)
{
return ReadAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult();
}
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (buffer == null)
throw new ArgumentNullException(nameof(buffer));
if (offset < 0 || offset >= buffer.Length)
throw new ArgumentOutOfRangeException(nameof(offset));
if (count < 0 || offset + count > buffer.Length)
throw new ArgumentOutOfRangeException(nameof(count));
while (true)
{
ReadResult result = await _reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> readBuffer = result.Buffer;
if (!readBuffer.IsEmpty)
{
var actual = Math.Min(readBuffer.Length, count);
readBuffer = readBuffer.Slice(0, actual);
//requires c# 7.2 or later
readBuffer.CopyTo(new Span<byte>(buffer, offset, count));
_reader.AdvanceTo(readBuffer.End, readBuffer.End);
return (int)actual;
}
else if (result.IsCompleted)
{
_reader.Complete();
return 0;
}
_reader.AdvanceTo(readBuffer.End, readBuffer.End);
}
}
public override void Write(byte[] buffer, int offset, int count)
{
WriteAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult();
}
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (buffer == null)
throw new ArgumentNullException(nameof(buffer));
if (offset < 0 || offset >= buffer.Length)
throw new ArgumentOutOfRangeException(nameof(offset));
if (count < 0 || offset + count > buffer.Length)
throw new ArgumentOutOfRangeException(nameof(count));
if (count == 0) return;
await _lock.WaitAsync(cancellationToken);
try
{
await _writer.WriteAsync(buffer.AsMemory(offset, count), cancellationToken);
}
finally
{
_lock.Release();
}
}
public override void Close()
{
try
{
_writer.Complete();
}
finally
{
base.Close();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment