Skip to content

Instantly share code, notes, and snippets.

@analogrelay
Created September 27, 2016 23:49
Show Gist options
  • Save analogrelay/c77ce1492390f78b886c43756ffcb357 to your computer and use it in GitHub Desktop.
Save analogrelay/c77ce1492390f78b886c43756ffcb357 to your computer and use it in GitHub Desktop.
ReadAtLeastAsync Helper
using System.Threading;
using System.Threading.Tasks;
using Channels;
namespace Microsoft.Extensions.WebSockets.Internal
{
public static class ChannelExtensions
{
public static ValueTask<ReadableBuffer> ReadAtLeastAsync(this IReadableChannel input, int minimumRequiredBytes) => ReadAtLeastAsync(input, minimumRequiredBytes, CancellationToken.None);
public static ValueTask<ReadableBuffer> ReadAtLeastAsync(this IReadableChannel input, int minimumRequiredBytes, CancellationToken cancellationToken)
{
var awaiter = input.ReadAsync();
// Short-cut path!
if(awaiter.IsCompleted)
{
// We have a buffer, is it big enough?
var buffer = awaiter.GetResult();
if(buffer.Length >= minimumRequiredBytes)
{
return new ValueTask<ReadableBuffer>(buffer);
}
// Buffer wasn't big enough, mark it as examined and continue to the "slow" path below
input.Advance(
consumed: buffer.Start,
examined: buffer.End);
}
return new ValueTask<ReadableBuffer>(ReadAtLeastSlowAsync(awaiter, input, minimumRequiredBytes, cancellationToken));
}
private static async Task<ReadableBuffer> ReadAtLeastSlowAsync(ReadableBufferAwaitable awaitable, IReadableChannel input, int minimumRequiredBytes, CancellationToken cancellationToken)
{
var buffer = await awaitable;
while(buffer.Length < minimumRequiredBytes)
{
cancellationToken.ThrowIfCancellationRequested();
input.Advance(
consumed: buffer.Start,
examined: buffer.End);
buffer = await input.ReadAsync();
}
return buffer;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment