Skip to content

Instantly share code, notes, and snippets.

@davidfowl
Last active February 17, 2018 08:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save davidfowl/1cd3f9e2c3f88104e54edc8bb2b92bcc to your computer and use it in GitHub Desktop.
Save davidfowl/1cd3f9e2c3f88104e54edc8bb2b92bcc to your computer and use it in GitHub Desktop.
This is buggy
public class StreamPipeReader : PipeReader
{
private readonly Stream _stream;
private readonly PipeCompletionSource<ReadResult> _pipeCompletionSource = new PipeCompletionSource<ReadResult>();
private byte[] _buffer;
private int _consumedIndex;
private int _examinedIndex;
private int _read;
private List<(Action<Exception, object> callback, object state)> _callbacks;
private bool _completed;
public StreamPipeReader(Stream stream)
{
_stream = stream;
}
public override void AdvanceTo(SequencePosition consumed)
{
AdvanceTo(consumed, consumed);
}
public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
{
if (consumed.Segment != _buffer || examined.Segment != _buffer)
{
throw new Exception("Incorrect buffer passed");
}
_consumedIndex = consumed.Index;
_examinedIndex = examined.Index;
}
public override void CancelPendingRead()
{
_pipeCompletionSource.SetResult(new ReadResult(ReadOnlySequence<byte>.Empty, isCanceled: true, isCompleted: _completed));
}
public override void Complete(Exception exception = null)
{
// Reader is complete
_buffer = null;
}
public override void OnWriterCompleted(Action<Exception, object> callback, object state)
{
if (_callbacks == null)
{
_callbacks = new List<(Action<Exception, object> callback, object state)>();
}
_callbacks.Add((callback, state));
}
public override PipeAwaiter<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
{
// TODO: Block overlapping reads
_ = DoReadAsync(cancellationToken);
return new PipeAwaiter<ReadResult>(_pipeCompletionSource);
}
private async Task DoReadAsync(CancellationToken cancellationToken)
{
if (_buffer == null)
{
_buffer = new byte[4096];
}
var index = 0;
if (_consumedIndex < _read)
{
// We can read more data into the buffer leaving the unconsumed data in there
index = _consumedIndex;
// If we didn't examine everything then return the unconsumed bytes immediately
if (_examinedIndex < _read)
{
var buffer = new ReadOnlySequence<byte>(_buffer, _consumedIndex, _read - _consumedIndex);
_pipeCompletionSource.SetResult(new ReadResult(buffer, isCanceled: false, isCompleted: _completed));
return;
}
}
var count = _buffer.Length - index;
// If we have less than 1/2 the buffer size, then allocate a new buffer and copy the unconsumed data
if (count < (_buffer.Length / 2))
{
var newBuffer = new byte[4096];
Buffer.BlockCopy(_buffer, index, newBuffer, 0, count);
_buffer = newBuffer;
index = 0;
count = _buffer.Length;
}
try
{
_read = await _stream.ReadAsync(_buffer, index, count, cancellationToken);
var buffer = new ReadOnlySequence<byte>(_buffer, 0, _read);
var result = new ReadResult(buffer, isCanceled: false, isCompleted: _read == 0);
_completed = _read == 0;
_pipeCompletionSource.SetResult(result);
if (_completed)
{
// Fire the callbacks when we're done with the stream
_callbacks?.ForEach(c => c.callback(null, c.state));
}
}
catch (OperationCanceledException)
{
// REVIEW: Do we set IsCompleted?
_pipeCompletionSource.SetResult(new ReadResult(new ReadOnlySequence<byte>(Array.Empty<byte>()), isCanceled: true, isCompleted: true));
}
catch (Exception ex)
{
_pipeCompletionSource.SetException(ex);
_callbacks?.ForEach(c => c.callback(ex, c.state));
}
}
public override bool TryRead(out ReadResult result)
{
// TODO: Implement
result = default;
return false;
}
}
// Simple reusable TCS equivalent
public class PipeCompletionSource<T> : IPipeAwaiter<T>
{
private static readonly Action _onCompleted = () => { };
private T _result;
private Action _callback;
private Exception _exception;
public bool IsCompleted => _callback == _onCompleted;
public T GetResult()
{
var exception = _exception;
var result = _result;
_result = default;
_exception = default;
_callback = default;
if (exception != null)
{
ExceptionDispatchInfo.Throw(exception);
}
return result;
}
public void OnCompleted(Action continuation)
{
if (_callback == _onCompleted ||
Interlocked.CompareExchange(ref _callback, continuation, null) == _onCompleted)
{
Task.Run(continuation);
}
}
public void SetResult(T result)
{
_result = result;
Interlocked.Exchange(ref _callback, _onCompleted)?.Invoke();
}
public void SetException(Exception exception)
{
_exception = exception;
Interlocked.Exchange(ref _callback, _onCompleted)?.Invoke();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment