Skip to content

Instantly share code, notes, and snippets.

@MiloszKrajewski
Last active January 15, 2023 22:08
Show Gist options
  • Save MiloszKrajewski/75a8c8dca999ba1bae5f81c6b8b5ff44 to your computer and use it in GitHub Desktop.
Save MiloszKrajewski/75a8c8dca999ba1bae5f81c6b8b5ff44 to your computer and use it in GitHub Desktop.
// * stackalloc does not work, we need pool as consumer is async so it needs Memory not Span
// * this might be most likley overcomplicated and ArrayBufferWriter could be enough,
// but it really tries to abuse the chance that read chunks are very small so there is
// only one rent from pool and one alloc for final result
// * these 3 methods could be a struct nicely encapsulating functionality but it is used
// from async method so struct would be copied all the time
// * these 3 methods could be a class, but I would like to limit allocation to minimum,
// so it uses `ref` arguments to delegate `state` to caller and allow keeping it on stack
static async ValueTask<ReadOnlyMemory<byte>> ReceiveStringAsync(WebSocket socket, CancellationToken ct = default)
{
var dynamicBuffer = default(ArrayBufferWriter<byte>); // we don't need it at first
var fixedBuffer = ArrayPool<byte>.Shared.Rent(0x4000); // made up size, but no need to keep small
var fixedBufferPosition = 0;
ValueWebSocketReceiveResult result;
do
{
ct.ThrowIfCancellationRequested();
// get writable chunk of memory from preallocated pool
var writable = GetWritableMemory(128, ref dynamicBuffer, fixedBuffer, fixedBufferPosition);
// not reading into buffer and then writing to stream, just wrinting straight to this dynamic buffer
result = await socket.ReceiveAsync(writable, ct);
// updated state after read
AdvanceAfterRead(result.Count, dynamicBuffer, fixedBuffer, ref fixedBufferPosition);
} while (!result.EndOfMessage);
if (result.MessageType != WebSocketMessageType.Text || result.Count == 0)
{
throw new Exception("Unexpected message");
}
// as so far we might have worked with rented array we need to copy now
return CloneWrittenBytes(dynamicBuffer, fixedBuffer, fixedBufferPosition);
}
static Memory<byte> GetWritableMemory(
int size, ref ArrayBufferWriter<byte> dynamicBuffer, Memory<byte> fixedBuffer, int fixedBufferPosition)
{
// above fixed buffer size already
if (dynamicBuffer is not null)
return dynamicBuffer.GetMemory(size);
// still in fixed buffer range
if (fixedBufferPosition + size <= fixedBuffer.Length)
return fixedBuffer.Slice(fixedBufferPosition, size);
// we just ran out of space in fixed buffer
dynamicBuffer = new ArrayBufferWriter<byte>();
return dynamicBuffer.GetMemory(size);
}
static void AdvanceAfterRead(
int size, ArrayBufferWriter<byte> dynamicBuffer, Memory<byte> fixedBuffer, ref int fixedBufferPosition)
{
if (dynamicBuffer is not null)
{
dynamicBuffer.Advance(size);
}
else
{
// what just happened?
if (fixedBufferPosition + size > fixedBuffer.Length)
throw new ArgumentException("No way you can advance by that much!");
fixedBufferPosition += size;
}
}
static byte[] CloneWrittenBytes(
ArrayBufferWriter<byte> dynamicBuffer, Memory<byte> fixedBuffer, int fixedBufferPosition)
{
var fixedBytes = fixedBuffer.Slice(0, fixedBufferPosition);
var dynamicBytes = dynamicBuffer?.WrittenMemory;
var result = new byte[fixedBytes.Length + dynamicBytes?.Length ?? 0];
fixedBytes.CopyTo(result);
dynamicBytes?.CopyTo(result.AsMemory(fixedBufferPosition));
return result;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment