Skip to content

Instantly share code, notes, and snippets.

@davidfowl
Last active June 7, 2019 18:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save davidfowl/5947c833115a169a340a31541e8018ea to your computer and use it in GitHub Desktop.
Save davidfowl/5947c833115a169a340a31541e8018ea to your computer and use it in GitHub Desktop.
public class DuplexPipe<TStream> : Stream, IDuplexPipe where TStream : Stream
{
private readonly IDuplexPipe _duplexPipe;
public DuplexPipe(IDuplexPipe duplexPipe, Func<Stream, TStream> wrapper)
{
_duplexPipe = duplexPipe;
Stream = wrapper(this);
Input = PipeReader.Create(Stream);
Output = PipeWriter.Create(Stream);
}
public TStream Stream { get; }
public PipeReader Input { get; }
public PipeWriter Output { get; }
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => throw new NotSupportedException();
public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
public override void Flush()
{
}
public override int Read(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
public override void SetLength(long value)
{
throw new NotImplementedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotImplementedException();
}
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
ReadResult result = await _duplexPipe.Input.ReadAsync(cancellationToken).ConfigureAwait(false);
if (result.IsCanceled)
{
throw new OperationCanceledException();
}
ReadOnlySequence<byte> sequence = result.Buffer;
long bufferLength = sequence.Length;
SequencePosition consumed = sequence.Start;
try
{
if (bufferLength != 0)
{
int actual = (int)Math.Min(bufferLength, buffer.Length);
ReadOnlySequence<byte> slice = actual == bufferLength ? sequence : sequence.Slice(0, actual);
consumed = slice.End;
slice.CopyTo(buffer.Span);
return actual;
}
if (result.IsCompleted)
{
return 0;
}
}
finally
{
_duplexPipe.Input.AdvanceTo(consumed);
}
throw new InvalidOperationException();
}
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
ValueTask<FlushResult> valueTask = _duplexPipe.Output.WriteAsync(buffer, cancellationToken);
return new ValueTask(GetFlushResultAsTask(valueTask));
}
public override Task FlushAsync(CancellationToken cancellationToken)
{
ValueTask<FlushResult> valueTask = _duplexPipe.Output.FlushAsync(cancellationToken);
return GetFlushResultAsTask(valueTask);
}
private static Task GetFlushResultAsTask(ValueTask<FlushResult> valueTask)
{
if (valueTask.IsCompletedSuccessfully)
{
FlushResult result = valueTask.Result;
if (result.IsCanceled)
{
throw new OperationCanceledException();
}
return Task.CompletedTask;
}
static async Task AwaitTask(ValueTask<FlushResult> valueTask)
{
FlushResult result = await valueTask.ConfigureAwait(false);
if (result.IsCanceled)
{
throw new OperationCanceledException();
}
}
return AwaitTask(valueTask);
}
public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
return _duplexPipe.Input.CopyToAsync(destination, cancellationToken);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment